Skip to content

Commit

Permalink
global: implement Jobs backend
Browse files Browse the repository at this point in the history
  • Loading branch information
slint committed May 22, 2024
1 parent a2dd9a4 commit f4658ff
Show file tree
Hide file tree
Showing 12 changed files with 600 additions and 25 deletions.
21 changes: 21 additions & 0 deletions invenio_jobs/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,27 @@
JOBS_FACETS = {}
"""Facets/aggregations for Jobs results."""

JOBS_QUEUES = {
"celery": {
"name": "celery",
"title": _("Default"),
"description": _("Default queue"),
},
"low": {
"name": "low",
"title": _("Low"),
"description": _("Low priority queue"),
},
}
"""List of available Celery queues.
This doesn't create any of the queues, but just controls to which Celery queue a job
is pushed to. You still need to configure Celery workers to listen to these queues.
"""

JOBS_DEFAULT_QUEUE = None
"""Default Celery queue."""

JOBS_SORT_OPTIONS = {
"jobs": dict(
title=_("Jobs"),
Expand Down
21 changes: 21 additions & 0 deletions invenio_jobs/ext.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@

"""Jobs extension."""

from celery import current_app as current_celery_app
from flask import current_app
from invenio_i18n import gettext as _

from . import config
from .models import Task
from .resources import (
JobsResource,
JobsResourceConfig,
Expand Down Expand Up @@ -66,6 +69,24 @@ def init_resource(self, app):
TasksResourceConfig.build(app), self.tasks_service
)

@property
def queues(self):
"""Return the queues."""
return current_app.config["JOBS_QUEUES"]

@property
def default_queue(self):
"""Return the default queue."""
return (
current_app.config.get("JOBS_DEFAULT_QUEUE")
or current_celery_app.conf.task_default_queue
)

@property
def tasks(self):
"""Return the tasks."""
return Task.all()


def finalize_app(app):
"""Finalize app."""
Expand Down
21 changes: 11 additions & 10 deletions invenio_jobs/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@
"""Models."""

import enum
import uuid
from inspect import signature

from celery import current_app
from celery import current_app as current_celery_app
from invenio_accounts.models import User
from invenio_db import db
from sqlalchemy.dialects import postgresql
Expand All @@ -29,15 +30,15 @@
class Job(db.Model, Timestamp):
"""Job model."""

id = db.Column(UUIDType, primary_key=True)
id = db.Column(UUIDType, primary_key=True, default=uuid.uuid4)
active = db.Column(db.Boolean, default=True, nullable=False)
title = db.Column(db.String(255), nullable=False)
description = db.Column(db.Text)

celery_tasks = db.Column(db.String(255))
task = db.Column(db.String(255))
default_queue = db.Column(db.String(64))
default_args = db.Column(JSON, default=lambda: dict(), nullable=True)
schedule = db.Column(JSON, default=lambda: dict(), nullable=True)
schedule = db.Column(JSON, nullable=True)

# TODO: See if we move this to an API class
@property
Expand All @@ -60,7 +61,7 @@ class RunStatusEnum(enum.Enum):
class Run(db.Model, Timestamp):
"""Run model."""

id = db.Column(UUIDType, primary_key=True)
id = db.Column(UUIDType, primary_key=True, default=uuid.uuid4)

job_id = db.Column(UUIDType, db.ForeignKey(Job.id))
job = db.relationship(Job, backref=db.backref("runs", lazy="dynamic"))
Expand Down Expand Up @@ -116,10 +117,10 @@ def all(cls):
"""Return all tasks."""
if getattr(cls, "_all_tasks", None) is None:
# Cache results
cls._all_tasks = [
cls(task)
for task in current_app.tasks.values()
cls._all_tasks = {
k: cls(task)
for k, task in current_celery_app.tasks.items()
# Filter outer Celery internal tasks
if not task.name.startswith("celery.")
]
if not k.startswith("celery.")
}
return cls._all_tasks
20 changes: 20 additions & 0 deletions invenio_jobs/proxies.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2024 CERN.
#
# Invenio-Jobs is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.

"""Proxies."""

from flask import current_app
from werkzeug.local import LocalProxy

current_jobs = LocalProxy(lambda: current_app.extensions["invenio-jobs"])
"""Jobs extension."""

current_jobs_service = LocalProxy(lambda: current_jobs.service)
"""Jobs service."""

current_runs_service = LocalProxy(lambda: current_jobs.runs_service)
"""Runs service."""
6 changes: 5 additions & 1 deletion invenio_jobs/resources/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
from invenio_records_resources.resources.records.args import SearchRequestArgsSchema
from invenio_records_resources.services.base.config import ConfiguratorMixin

from ..services.errors import JobNotFoundError


class TasksResourceConfig(ResourceConfig, ConfiguratorMixin):
"""Celery tasks resource config."""
Expand Down Expand Up @@ -50,7 +52,9 @@ class JobsResourceConfig(ResourceConfig, ConfiguratorMixin):

error_handlers = {
**ErrorHandlersMixin.error_handlers,
# TODO: Add custom error handlers here
JobNotFoundError: create_error_handler(
lambda e: HTTPJSONException(code=404, description=e.description)
),
}


Expand Down
11 changes: 10 additions & 1 deletion invenio_jobs/services/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
SearchOptions as SearchOptionsBase,
)
from invenio_records_resources.services.records.links import pagination_links
from sqlalchemy import asc, desc

from ..models import Job, Run, Task
from . import results
Expand Down Expand Up @@ -68,7 +69,15 @@ class TasksServiceConfig(ServiceConfig, ConfiguratorMixin):
class JobSearchOptions(SearchOptionsBase):
"""Job search options."""

# TODO: See what we need to override
sort_default = "title"
sort_direction_default = "asc"
sort_direction_options = {
"asc": dict(title=_("Ascending"), fn=asc),
"desc": dict(title=_("Descending"), fn=desc),
}
sort_options = {"title": dict(title=_("Title"), fields=["title"])}

pagination_options = {"default_results_per_page": 25}


class JobsServiceConfig(ServiceConfig, ConfiguratorMixin):
Expand Down
30 changes: 30 additions & 0 deletions invenio_jobs/services/errors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2024 CERN.
# Copyright (C) 2024 University of Münster.
#
# Invenio-Jobs is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.

"""Service definitions."""

from invenio_i18n import gettext as _


class JobsError(Exception):
"""Base class for Jobs errors."""

def __init__(self, description, *args: object):
"""Constructor."""
self.description = description
super().__init__(*args)


class JobNotFoundError(JobsError):
"""Job not found error."""

def __init__(self, id):
"""Initialise error."""
super().__init__(
description=_("Job with ID {id} does not exist.").format(id=id)
)
20 changes: 17 additions & 3 deletions invenio_jobs/services/results.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@

"""Service results."""

from collections.abc import Iterable, Sized

from flask_sqlalchemy import Pagination
from invenio_records_resources.services.records.results import RecordItem, RecordList


Expand All @@ -16,18 +19,29 @@ class Item(RecordItem):
@property
def id(self):
"""Get the result id."""
return self._record.id
return str(self._record.id)


class List(RecordList):
"""List result."""

@property
def items(self):
"""Iterator over the items."""
if isinstance(self._results, Pagination):
return self._results.items
elif isinstance(self._results, Iterable):
return self._results
return self._results

@property
def total(self):
"""Get total number of hits."""
if hasattr(self._results, "hits"):
return self._results.hits.total["value"]
elif isinstance(self._results, (tuple, list)):
if isinstance(self._results, Pagination):
return self._results.total
elif isinstance(self._results, Sized):
return len(self._results)
else:
return None
Expand All @@ -44,7 +58,7 @@ def aggregations(self):
@property
def hits(self):
"""Iterator over the hits."""
for hit in self._results:
for hit in self.items:
# Project the hit
projection = self._schema.dump(
hit,
Expand Down
69 changes: 68 additions & 1 deletion invenio_jobs/services/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,12 @@

from invenio_i18n import lazy_gettext as _
from marshmallow import EXCLUDE, Schema, fields, validate
from marshmallow_oneofschema import OneOfSchema
from marshmallow_utils.fields import SanitizedUnicode
from marshmallow_utils.permissions import FieldPermissionsMixin
from marshmallow_utils.validators import LazyOneOf

from ..proxies import current_jobs


def _not_blank(**kwargs):
Expand Down Expand Up @@ -57,6 +61,47 @@ class TaskSchema(Schema, FieldPermissionsMixin):
)


class IntervalScheduleSchema(Schema):
"""Schema for an interval schedule based on ``datetime.timedelta``."""

type = fields.Constant("interval")

days = fields.Integer()
seconds = fields.Integer()
microseconds = fields.Integer()
milliseconds = fields.Integer()
minutes = fields.Integer()
hours = fields.Integer()
weeks = fields.Integer()


class CrontabScheduleSchema(Schema):
"""Schema for a crontab schedule."""

type = fields.Constant("crontab")

minute = fields.String(load_default="*")
hour = fields.String(load_default="*")
day_of_week = fields.String(load_default="*")
day_of_month = fields.String(load_default="*")
month_of_year = fields.String(load_default="*")


class ScheduleSchema(OneOfSchema):
"""Schema for a schedule."""

def get_obj_type(self, obj):
if isinstance(obj, dict) and "type" in obj:
return obj["type"]
return super().get_obj_type(obj)

type_schemas = {
"interval": IntervalScheduleSchema,
"crontab": CrontabScheduleSchema,
}
type_field_remove = False


class JobSchema(Schema, FieldPermissionsMixin):
"""Base schema for a job."""

Expand All @@ -67,12 +112,31 @@ class Meta:

id = fields.UUID(dump_only=True)

created = fields.DateTime(dump_only=True)
updated = fields.DateTime(dump_only=True)

title = SanitizedUnicode(required=True, validate=_not_blank(max=250))
description = SanitizedUnicode()

active = fields.Boolean(load_default=True)

task = fields.String(
required=True,
validate=LazyOneOf(choices=lambda: current_jobs.tasks.keys()),
)
default_queue = fields.String(
validate=LazyOneOf(choices=lambda: current_jobs.queues.keys()),
load_default=lambda: current_jobs.default_queue,
)
default_args = fields.Dict(load_default=dict)

schedule = fields.Nested(ScheduleSchema, allow_none=True, load_default=None)

last_run = fields.Nested(lambda: RunSchema, dump_only=True)


class RunSchema(Schema, FieldPermissionsMixin):
"""Base schema for a job."""
"""Base schema for a job run."""

class Meta:
"""Meta attributes for the schema."""
Expand All @@ -81,5 +145,8 @@ class Meta:

id = fields.UUID(dump_only=True)

created = fields.DateTime(dump_only=True)
updated = fields.DateTime(dump_only=True)

title = SanitizedUnicode(required=True, validate=_not_blank(max=250))
description = SanitizedUnicode()
Loading

0 comments on commit f4658ff

Please sign in to comment.