Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug when stopping scheduler with jobs running - APScheduler v4 #946

Open
3 tasks done
HK-Mattew opened this issue Jul 28, 2024 · 16 comments
Open
3 tasks done

Bug when stopping scheduler with jobs running - APScheduler v4 #946

HK-Mattew opened this issue Jul 28, 2024 · 16 comments
Labels

Comments

@HK-Mattew
Copy link
Contributor

Things to check first

  • I have checked that my issue does not already have a solution in the FAQ

  • I have searched the existing issues and didn't find my bug already reported there

  • I have checked that my bug is still present in the latest release

Version

4.0.0a5

What happened?

Hello,

I would like to report a bug that occurs after stopping the Scheduler. From what I have noticed, the bug occurs when I stop the scheduler while there is a job running.

Summary: While a job is still running, I use the scheduler's .stop method. I wait until the scheduler.state is in the stopped state. I see that when it reaches the scheduler.state.stopped state, the job that was running before I stopped the scheduler is completed successfully. However, some operations remain pending in the DataStore, such as decreasing the running_jobs of the task document, and the job is not deleted from the job collection.

Tested only with MongoDBDataStore

How can we reproduce the bug?

Code to replicate the bug:

from apscheduler import (
    Scheduler,
    SchedulerRole,
    TaskDefaults,
    RunState
)
from apscheduler.datastores.mongodb import (
    MongoDBDataStore
)
from apscheduler.triggers.interval import IntervalTrigger

from apscheduler.executors.thread import ThreadPoolJobExecutor
from datetime import datetime
import time

import logging

logging.basicConfig()
logging.getLogger('apscheduler').setLevel(logging.INFO)



MONGO_DB_URI = '<your-mongodb-uri>'
DB_NAME_FOR_APS = 'tryreplybugaps'


scheduler_web_configs = dict(
    data_store=MongoDBDataStore(
        client_or_uri=MONGO_DB_URI,
        database=DB_NAME_FOR_APS
    ),
    role=SchedulerRole.both,
    max_concurrent_jobs=100,
    job_executors={
        'threadpool': ThreadPoolJobExecutor()
    },
    task_defaults=TaskDefaults(
        job_executor='threadpool',
        max_running_jobs=1
    )
)

with Scheduler(
    **scheduler_web_configs
    ) as scheduler:
    

    def my_job_func() -> None:

        print('In a lengthy process...')
        time.sleep(10)
        print('Long process completed.')

        return None


    scheduler.add_schedule(
        my_job_func, # This will run in `threadpool` as specified in TaskDefaults.
        trigger=IntervalTrigger(
            hours=1,
            start_time=datetime.now()
        )
    )

    scheduler.start_in_background()


    while len(scheduler._async_scheduler._running_jobs) == 0:
        print('Waiting for some job to start running...')
        time.sleep(5)
    

    print('Stopping the scheduler when there are still jobs running...', scheduler._async_scheduler._running_jobs)
    scheduler.stop()


    while scheduler.state != RunState.stopped:
        print('Waiting for the scheduler to be in the stopped state...')
        time.sleep(2)


    assert len(scheduler._async_scheduler._running_jobs) == 0


    datastore_tasks = scheduler.get_tasks() # The task continues with the field running_jobs=1
    datastore_jobs = scheduler.get_jobs() # The job was not deleted after execution.

    print('Tasks:', datastore_tasks)
    print('Jobs:', datastore_jobs)

    """
    In my case, the execution went well. The job was completed.

    However, in the task document in the database, the task continues with the field running_jobs=1.,
    But the job is no longer being executed, and this will prevent a new start of the schedule tasks in the next program starts,
    since I use max_running_jobs=1.

    And in the jobs collection, the job document remains in the database. Which I believe should have been deleted, right?
    """

My logs running the sample code:

INFO:apscheduler._schedulers.async_:Added new schedule (task='__main__:my_job_func', trigger=IntervalTrigger(hours=1, start_time='2024-07-28 18:58:32.458838-03:00')); next run time at 2024-07-28 18:58:32.458838-03:00
INFO:apscheduler._schedulers.async_:Scheduler started
Waiting for some job to start running...
In a lengthy process...
Stopping the scheduler when there are still jobs running... {Job(id=UUID('1503845e-75bf-4c90-a583-b0bc138a94ce'), task_id='__main__:my_job_func', schedule_id='a3325d5d-1ff6-4ff4-8ca8-e8ad934ac782', scheduled_fire_time=datetime.datetime(2024, 7, 28, 18, 58, 32, 458838, tzinfo=datetime.timezone(datetime.timedelta(days=-1, seconds=75600))), executor='threadpool', created_at=datetime.datetime(2024, 7, 28, 21, 58, 33, 230771, tzinfo=datetime.timezone.utc))}
Waiting for the scheduler to be in the stopped state...
Waiting for the scheduler to be in the stopped state...
Waiting for the scheduler to be in the stopped state...
Waiting for the scheduler to be in the stopped state...
Long process completed.
INFO:apscheduler._schedulers.async_:Job 1503845e-75bf-4c90-a583-b0bc138a94ce completed successfully
INFO:apscheduler._schedulers.async_:Scheduler stopped
Tasks: [
    Task(
        id='__main__:my_job_func',
        func='__main__:my_job_func',
        job_executor='threadpool',
        max_running_jobs=1,
        misfire_grace_time=None,
        running_jobs=1
        )
    ]
Jobs: [
    Job(
        id=UUID('1503845e-75bf-4c90-a583-b0bc138a94ce'),
        task_id='__main__:my_job_func',
        schedule_id='a3325d5d-1ff6-4ff4-8ca8-e8ad934ac782',
        scheduled_fire_time=datetime.datetime(2024, 7, 28, 18, 58, 32, 458838,
        tzinfo=datetime.timezone(datetime.timedelta(days=-1, seconds=75600))),
        executor='threadpool',
        created_at=datetime.datetime(2024, 7, 28, 21, 58, 33, 230771, tzinfo=datetime.timezone.utc)
    )
    ]
@HK-Mattew HK-Mattew added the bug label Jul 28, 2024
@agronholm
Copy link
Owner

I see two problems in the code:

  1. The stop() method is a bit too forceful (by default), and should probably not cancel the scheduler's task group
  2. The job acquisition operation isn't exactly atomic on MongoDB (I don't know how to actually accomplish that given its nature), but a cancellation will abort the operation halfway through.

@HK-Mattew
Copy link
Contributor Author

  1. The stop() method is a bit too forceful (by default), and should probably not cancel the scheduler's task group

However, the expected behavior when the job is completed normally is:

  • Acquire job
  • Execute job
  • Delete job
  • In the task, decrement the running_jobs field

However, in my case, I used Scheduler.stop before the job was completed. Then the Scheduler showed me the log Job ...id... completed successfully, so the job was completed. However, the job was not deleted from the datastore and the task did not have its running_jobs decrement.

My question is, what is causing the scheduler not to delete the job at the end of its execution and also why did it not decrement the task's running_jobs field?

2. The job acquisition operation isn't exactly atomic on MongoDB (I don't know how to actually accomplish that given its nature), but a cancellation will abort the operation halfway through.

In pymongo you can use Transactions within a session. But I don't think it's necessary in this case, because the task was not canceled or got an error. The job was completed successfully.

If I'm leaving anything unnoticed please let me know.

@agronholm
Copy link
Owner

In pymongo you can use Transactions within a session. But I don't think it's necessary in this case, because the task was not canceled or got an error. The job was completed successfully.

Transactions don't work on a single node Mongo server.

@agronholm
Copy link
Owner

My question is, what is causing the scheduler not to delete the job at the end of its execution and also why did it not decrement the task's running_jobs field?

Because stop() currently cancels all the task groups within the scheduler, and there is no shielding to prevent the job release operation from being cancelled.

@HK-Mattew
Copy link
Contributor Author

In pymongo you can use Transactions within a session. But I don't think it's necessary in this case, because the task was not canceled or got an error. The job was completed successfully.

Transactions don't work on a single node Mongo server.

You are right. So that would be the difficulty because not everyone uses MongoDB with more than one node :/

I use MongoDB with more than one node. So I think I would have to try to adapt something like MongoDBDataStore(allow_transaction=True)

@HK-Mattew
Copy link
Contributor Author

My question is, what is causing the scheduler not to delete the job at the end of its execution and also why did it not decrement the task's running_jobs field?

Because stop() currently cancels all the task groups within the scheduler, and there is no shielding to prevent the job release operation from being cancelled.

I understood

@agronholm
Copy link
Owner

In pymongo you can use Transactions within a session. But I don't think it's necessary in this case, because the task was not canceled or got an error. The job was completed successfully.

Transactions don't work on a single node Mongo server.

You are right. So that would be the difficulty because not everyone uses MongoDB with more than one node :/

I use MongoDB with more than one node. So I think I would have to try to adapt something like MongoDBDataStore(allow_transaction=True)

How would that help users with just one node?

@HK-Mattew
Copy link
Contributor Author

allow_transaction

Unfortunately this would still be a problem. Since transactions would only work for those who used allow_transaction=True

@HK-Mattew
Copy link
Contributor Author

I found a temporary solution to the problem I'm facing. Just save the jobs that were running when the scheduler stopped and use the .release_job method manually, like this:

asyncio.run(
       scheduler.data_store.release_job(
           ...
           )
)

This worked correctly even after the scheduler stopped.

@agronholm
Copy link
Owner

agronholm commented Jul 29, 2024

I found a temporary solution to the problem I'm facing. Just save the jobs that were running when the scheduler stopped and use the .release_job method manually, like this:

asyncio.run(
       scheduler.data_store.release_job(
           ...
           )
)

This worked correctly even after the scheduler stopped.

This is a dangerous looking "fix". You should be aware that I'm currently in the process of refactoring the stop() method to allow the scheduler to shut down more gracefully, allowing jobs to complete properly if they do so within the allotted time. I'm also considering shielding the release operations from CancelScope cancellation if that looks like it makes sense.

@HK-Mattew
Copy link
Contributor Author

This is a dangerous looking "fix". You should be aware that I'm currently in the process of refactoring the stop() method to allow the scheduler to shut down more gracefully, allowing jobs to complete properly if they do so within the allotted time. I'm also considering shielding the release operations from CancelScope cancellation if that looks like it makes sense.

It's not really the best solution. But it would help temporarily.

Your idea about CancelScope sounds good. I hope it works well :)

[Another idea]
One idea I had would be to work with signals in the Scheduler.

Example:

class Scheduler():
      ...

scheduler = Scheduler()

scheduler.send_signal('stop running new jobs')

"""
I wait until no jobs are running in the scheduler and then use the scheduler.stop() method.
With this, the scheduler would be able to process the job deletion operations after they are executed,
and also decrement the task running_jobs field.
"""

assert len(scheduler._async_scheduler._running_jobs) == 0

scheduler.stop()

This seems like a good solution to the current problem.

@agronholm
Copy link
Owner

The stop() method already sets the scheduler state to stopping which signals to the background tasks that they should exit their respective loops. Unfortunately, currently there are background tasks which sleep for certain periods of time, and I have to find a way to safely interrupt these tasks in order to allow their task groups to exit.

@HK-Mattew
Copy link
Contributor Author

The stop() method already sets the scheduler state to stopping which signals to the background tasks that they should exit their respective loops. Unfortunately, currently there are background tasks which sleep for certain periods of time, and I have to find a way to safely interrupt these tasks in order to allow their task groups to exit.

I just checked and, indeed, the AsyncScheduler._process_jobs method has a condition to only run with RunState.started.

So, I think the best bet would be your idea about CancelScope.

Man, I'd like to take this opportunity to thank you for your great work. This new version of APScheduler is looking amazing. I really like it. 😉

@agronholm
Copy link
Owner

So, I think the best bet would be your idea about CancelScope.

I'm not sure we're on the same page here. I brought up CancelScope because those could be used to shield certain sensitive operations (like releasing a job) from cancellation.

Man, I'd like to take this opportunity to thank you for your great work. This new version of APScheduler is looking amazing. I really like it. 😉

Thanks! Always nice to see one's work appreciated!

@neeraj9
Copy link

neeraj9 commented Dec 4, 2024

I noticed that with postgresql datastore if scheduler is terminated abruptly due to a program crashing or terminating then the running_jobs is left as is. There should be a reconciliation logic when the scheduler starts to align jobs (in jobs table) with running_jobs in the tasks table.

I am using async job_executor and having max_running_jobs greater than 1 to allow many jobs per task.

@agronholm
Copy link
Owner

Yeah, that was the intention. Things like this is the reason it's still in alpha.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants