Skip to content

Commit

Permalink
Removed debugging code and commented out code blocks
Browse files Browse the repository at this point in the history
  • Loading branch information
agronholm committed Feb 26, 2021
1 parent 0fc84d5 commit d10f202
Show file tree
Hide file tree
Showing 5 changed files with 0 additions and 80 deletions.
14 changes: 0 additions & 14 deletions apscheduler/datastores/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,26 +209,12 @@ async def release_schedules(self, scheduler_id: str, schedules: List[Schedule])
else:
finished_schedule_ids.append(s.id)

# if updated_ids:
# next_fire_time = min(s.next_fire_time for s in schedules
# if s.next_fire_time is not None)
# event = ScheduleUpdated(datetime.now(timezone.utc), updated_ids, next_fire_time)
# await self.publish(event)
# old_event, self._schedules_event = self._schedules_event, create_event()
# await old_event.set()

for event in update_events:
await self.publish(event)

# Remove schedules that didn't get a new next fire time
await self.remove_schedules(finished_schedule_ids)

# async def get_next_fire_time(self) -> Optional[datetime]:
# for schedule in self._schedules:
# return schedule.next_fire_time
#
# return None

async def add_job(self, job: Job) -> None:
state = JobState(job)
self._jobs.append(state)
Expand Down
4 changes: 0 additions & 4 deletions apscheduler/datastores/postgresql.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import asyncio
import logging
from asyncio import current_task
from datetime import datetime, timezone
from typing import Iterable, List, Optional, Set
from uuid import UUID
Expand Down Expand Up @@ -55,7 +54,6 @@ async def __aenter__(self):

self._loans += 1
if self._loans == 1 and self.notify_channel:
print('entering postgresql data store in task', id(current_task()))
self._task_group = create_task_group()
await self._task_group.__aenter__()
await self._task_group.spawn(self._listen_notifications)
Expand All @@ -66,7 +64,6 @@ async def __aexit__(self, exc_type, exc_val, exc_tb):
assert self._loans
self._loans -= 1
if self._loans == 0 and self.notify_channel:
print('exiting postgresql data store in task', id(current_task()))
await self._task_group.cancel_scope.cancel()
await self._task_group.__aexit__(exc_type, exc_val, exc_tb)
del self._schedules_event
Expand Down Expand Up @@ -293,7 +290,6 @@ async def get_jobs(self, ids: Optional[Iterable[UUID]] = None) -> List[Job]:

async def acquire_jobs(self, worker_id: str, limit: Optional[int] = None) -> List[Job]:
while True:
print('acquiring jobs')
jobs: List[Job] = []
async with self.pool.acquire() as conn, conn.transaction():
now = datetime.now(timezone.utc)
Expand Down
12 changes: 0 additions & 12 deletions apscheduler/schedulers/async_.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ async def __aenter__(self):
await self._task_group.spawn(self.run, start_event)
await start_event.wait()

# await self.start(self._task_group)
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
Expand Down Expand Up @@ -170,24 +169,13 @@ async def run(self, start_event: Optional[Event] = None) -> None:
job = Job(taskdef.id, taskdef.func, schedule.args, schedule.kwargs,
schedule.id, fire_time, schedule.next_deadline,
schedule.tags)
print('Added job', job.id, 'for schedule', schedule)
await self.data_store.add_job(job)

self.logger.debug('Releasing %d schedules', len(schedules))
await self.data_store.release_schedules(self.identity, schedules)

await self._stop_event.set()
del self._stop_event

# async def start(self, task_group: TaskGroup, *, reset_datastore: bool = False) -> None:
# start_event = create_event()
# await task_group.spawn(self.run, start_event)
# await start_event.wait()
#
# if self.start_worker:
# self._worker = AsyncWorker(self.data_store)
# await self._worker.start(task_group)

async def stop(self, force: bool = False) -> None:
self._running = False
if self._worker:
Expand Down
42 changes: 0 additions & 42 deletions apscheduler/schedulers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,20 +43,6 @@ def __enter__(self):

return self

# def __exit__(self, exc_type, exc_val, exc_tb):
# return self._exit_stack.__exit__(exc_type, exc_val, exc_tb)

# def __enter__(self) -> SyncScheduler:
# self.start()
# return self
#
# def __exit__(self, exc_type, exc_val, exc_tb):
# self.stop(force=exc_type is not None)

# @property
# def worker(self) -> AsyncWorker:
# return self._scheduler.worker

@property
def data_store(self) -> DataStore:
return self._scheduler.data_store
Expand All @@ -79,31 +65,3 @@ def stop(self) -> None:

def wait_until_stopped(self) -> None:
self.portal.call(self._scheduler.wait_until_stopped)

# def start(self):
# if not self._scheduler._running:
# if not self.portal:
# self.portal = start_blocking_portal()
# self._shutdown_portal = True
#
# self.portal.call(self._scheduler.data_store.initialize)
# start_event = self.portal.call(create_event)
# self._task = self.portal.spawn_task(self._scheduler.run, start_event)
# self.portal.call(start_event.wait)
# print('start_event wait finished')
#
# if self._scheduler.start_worker:
# self._worker = SyncWorker(self.data_store, portal=self.portal)
# self._worker.start()
#
# def stop(self, force: bool = False) -> None:
# if self._worker:
# self._worker.stop(force)
#
# if self._scheduler._running:
# self._scheduler._running = False
# try:
# self.portal.call(partial(self._scheduler.stop, force=force))
# finally:
# if self._shutdown_portal:
# self.portal.stop_from_external_thread(cancel_remaining=force)
8 changes: 0 additions & 8 deletions apscheduler/workers/async_.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ def __init__(self, data_store: DataStore, *, max_concurrent_jobs: int = 100,
raise ValueError('max_concurrent_jobs must be at least 1')

async def __aenter__(self):
print('entering worker in task', id(current_task()))
await self._exit_stack.__aenter__()

# Initialize the data store
Expand All @@ -57,7 +56,6 @@ async def __aenter__(self):
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
print('exiting worker in task', id(current_task()))
await self.stop(force=exc_type is not None)
await self._exit_stack.__aexit__(exc_type, exc_val, exc_tb)

Expand Down Expand Up @@ -139,12 +137,6 @@ async def _call_job_func(self, func: Callable, args: tuple, kwargs: Dict[str, An

return return_value

# async def start(self, task_group: TaskGroup) -> None:
# start_event = create_event()
# print('spawning task for AsyncWorker.run() in task', id(current_task()))
# await task_group.spawn(self.run, start_event)
# await start_event.wait()

async def stop(self, force: bool = False) -> None:
self._running = False
if self._acquire_cancel_scope:
Expand Down

0 comments on commit d10f202

Please sign in to comment.