-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
26 lines (21 loc) · 827 Bytes
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import schedule
import time
from app.models import Base
from app.scheduler import job
from database.session import engine
from database.database_saver import dump_database
if __name__ == "__main__":
Base.metadata.create_all(engine)
schedule.every().day.at("12:00").do(job)
schedule.every().day.at("00:00").do(dump_database)
while True:
now = time.localtime()
next_run = schedule.next_run()
next_run_timetuple = next_run.timetuple() if next_run else now
sleep_duration = time.mktime(next_run_timetuple) - time.mktime(now)
if sleep_duration > 0:
print(f'Next run scheduled at: {next_run.strftime("%Y-%m-%d %H:%M:%S")}')
time.sleep(sleep_duration)
else:
print('Running scheduled tasks...')
schedule.run_pending()