diff --git a/apscheduler/datastores/memory.py b/apscheduler/datastores/memory.py index 826981a9..7ba38fcf 100644 --- a/apscheduler/datastores/memory.py +++ b/apscheduler/datastores/memory.py @@ -209,26 +209,12 @@ async def release_schedules(self, scheduler_id: str, schedules: List[Schedule]) else: finished_schedule_ids.append(s.id) - # if updated_ids: - # next_fire_time = min(s.next_fire_time for s in schedules - # if s.next_fire_time is not None) - # event = ScheduleUpdated(datetime.now(timezone.utc), updated_ids, next_fire_time) - # await self.publish(event) - # old_event, self._schedules_event = self._schedules_event, create_event() - # await old_event.set() - for event in update_events: await self.publish(event) # Remove schedules that didn't get a new next fire time await self.remove_schedules(finished_schedule_ids) - # async def get_next_fire_time(self) -> Optional[datetime]: - # for schedule in self._schedules: - # return schedule.next_fire_time - # - # return None - async def add_job(self, job: Job) -> None: state = JobState(job) self._jobs.append(state) diff --git a/apscheduler/datastores/postgresql.py b/apscheduler/datastores/postgresql.py index c4e81e28..19492644 100644 --- a/apscheduler/datastores/postgresql.py +++ b/apscheduler/datastores/postgresql.py @@ -1,6 +1,5 @@ import asyncio import logging -from asyncio import current_task from datetime import datetime, timezone from typing import Iterable, List, Optional, Set from uuid import UUID @@ -55,7 +54,6 @@ async def __aenter__(self): self._loans += 1 if self._loans == 1 and self.notify_channel: - print('entering postgresql data store in task', id(current_task())) self._task_group = create_task_group() await self._task_group.__aenter__() await self._task_group.spawn(self._listen_notifications) @@ -66,7 +64,6 @@ async def __aexit__(self, exc_type, exc_val, exc_tb): assert self._loans self._loans -= 1 if self._loans == 0 and self.notify_channel: - print('exiting postgresql data store in task', id(current_task())) await self._task_group.cancel_scope.cancel() await self._task_group.__aexit__(exc_type, exc_val, exc_tb) del self._schedules_event @@ -293,7 +290,6 @@ async def get_jobs(self, ids: Optional[Iterable[UUID]] = None) -> List[Job]: async def acquire_jobs(self, worker_id: str, limit: Optional[int] = None) -> List[Job]: while True: - print('acquiring jobs') jobs: List[Job] = [] async with self.pool.acquire() as conn, conn.transaction(): now = datetime.now(timezone.utc) diff --git a/apscheduler/schedulers/async_.py b/apscheduler/schedulers/async_.py index 6448aa5c..85ae0818 100644 --- a/apscheduler/schedulers/async_.py +++ b/apscheduler/schedulers/async_.py @@ -58,7 +58,6 @@ async def __aenter__(self): await self._task_group.spawn(self.run, start_event) await start_event.wait() - # await self.start(self._task_group) return self async def __aexit__(self, exc_type, exc_val, exc_tb): @@ -170,24 +169,13 @@ async def run(self, start_event: Optional[Event] = None) -> None: job = Job(taskdef.id, taskdef.func, schedule.args, schedule.kwargs, schedule.id, fire_time, schedule.next_deadline, schedule.tags) - print('Added job', job.id, 'for schedule', schedule) await self.data_store.add_job(job) - self.logger.debug('Releasing %d schedules', len(schedules)) await self.data_store.release_schedules(self.identity, schedules) await self._stop_event.set() del self._stop_event - # async def start(self, task_group: TaskGroup, *, reset_datastore: bool = False) -> None: - # start_event = create_event() - # await task_group.spawn(self.run, start_event) - # await start_event.wait() - # - # if self.start_worker: - # self._worker = AsyncWorker(self.data_store) - # await self._worker.start(task_group) - async def stop(self, force: bool = False) -> None: self._running = False if self._worker: diff --git a/apscheduler/schedulers/sync.py b/apscheduler/schedulers/sync.py index b56f6249..e405c8f7 100644 --- a/apscheduler/schedulers/sync.py +++ b/apscheduler/schedulers/sync.py @@ -43,20 +43,6 @@ def __enter__(self): return self - # def __exit__(self, exc_type, exc_val, exc_tb): - # return self._exit_stack.__exit__(exc_type, exc_val, exc_tb) - - # def __enter__(self) -> SyncScheduler: - # self.start() - # return self - # - # def __exit__(self, exc_type, exc_val, exc_tb): - # self.stop(force=exc_type is not None) - - # @property - # def worker(self) -> AsyncWorker: - # return self._scheduler.worker - @property def data_store(self) -> DataStore: return self._scheduler.data_store @@ -79,31 +65,3 @@ def stop(self) -> None: def wait_until_stopped(self) -> None: self.portal.call(self._scheduler.wait_until_stopped) - - # def start(self): - # if not self._scheduler._running: - # if not self.portal: - # self.portal = start_blocking_portal() - # self._shutdown_portal = True - # - # self.portal.call(self._scheduler.data_store.initialize) - # start_event = self.portal.call(create_event) - # self._task = self.portal.spawn_task(self._scheduler.run, start_event) - # self.portal.call(start_event.wait) - # print('start_event wait finished') - # - # if self._scheduler.start_worker: - # self._worker = SyncWorker(self.data_store, portal=self.portal) - # self._worker.start() - # - # def stop(self, force: bool = False) -> None: - # if self._worker: - # self._worker.stop(force) - # - # if self._scheduler._running: - # self._scheduler._running = False - # try: - # self.portal.call(partial(self._scheduler.stop, force=force)) - # finally: - # if self._shutdown_portal: - # self.portal.stop_from_external_thread(cancel_remaining=force) diff --git a/apscheduler/workers/async_.py b/apscheduler/workers/async_.py index 79499934..79010e1b 100644 --- a/apscheduler/workers/async_.py +++ b/apscheduler/workers/async_.py @@ -42,7 +42,6 @@ def __init__(self, data_store: DataStore, *, max_concurrent_jobs: int = 100, raise ValueError('max_concurrent_jobs must be at least 1') async def __aenter__(self): - print('entering worker in task', id(current_task())) await self._exit_stack.__aenter__() # Initialize the data store @@ -57,7 +56,6 @@ async def __aenter__(self): return self async def __aexit__(self, exc_type, exc_val, exc_tb): - print('exiting worker in task', id(current_task())) await self.stop(force=exc_type is not None) await self._exit_stack.__aexit__(exc_type, exc_val, exc_tb) @@ -139,12 +137,6 @@ async def _call_job_func(self, func: Callable, args: tuple, kwargs: Dict[str, An return return_value - # async def start(self, task_group: TaskGroup) -> None: - # start_event = create_event() - # print('spawning task for AsyncWorker.run() in task', id(current_task())) - # await task_group.spawn(self.run, start_event) - # await start_event.wait() - async def stop(self, force: bool = False) -> None: self._running = False if self._acquire_cancel_scope: