Skip to content

Commit

Permalink
global: implement Tasks backend
Browse files Browse the repository at this point in the history
  • Loading branch information
slint committed May 22, 2024
1 parent ff1f1a0 commit a2dd9a4
Show file tree
Hide file tree
Showing 10 changed files with 302 additions and 50 deletions.
20 changes: 19 additions & 1 deletion invenio_jobs/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,28 @@

"""Configuration."""

JOBS_ADMINISTRATION_DISABLED = False
from invenio_i18n import lazy_gettext as _

from .services.permissions import (
JobPermissionPolicy,
RunPermissionPolicy,
TasksPermissionPolicy,
)

JOBS_TASKS_PERMISSION_POLICY = TasksPermissionPolicy
"""Permission policy for tasks."""

JOBS_PERMISSION_POLICY = JobPermissionPolicy
"""Permission policy for jobs."""

JOBS_RUNS_PERMISSION_POLICY = RunPermissionPolicy
"""Permission policy for job runs."""

JOBS_ADMINISTRATION_DISABLED = False
"""Disable Jobs administration views if ``True``."""

JOBS_FACETS = {}
"""Facets/aggregations for Jobs results."""

JOBS_SORT_OPTIONS = {
"jobs": dict(
Expand Down
44 changes: 44 additions & 0 deletions invenio_jobs/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,15 @@
"""Models."""

import enum
from inspect import signature

from celery import current_app
from invenio_accounts.models import User
from invenio_db import db
from sqlalchemy.dialects import postgresql
from sqlalchemy_utils import Timestamp
from sqlalchemy_utils.types import ChoiceType, JSONType, UUIDType
from werkzeug.utils import cached_property

JSON = (
db.JSON()
Expand Down Expand Up @@ -79,3 +82,44 @@ class Run(db.Model, Timestamp):
task_id = db.Column(UUIDType, nullable=True)
args = db.Column(JSON, default=lambda: dict(), nullable=True)
queue = db.Column(db.String(64), nullable=True)


class Task:
"""Celery Task model."""

_all_tasks = None

def __init__(self, obj):
"""Initialize model."""
self._obj = obj

def __getattr__(self, name):
"""Proxy attribute access to the task object."""
# TODO: See if we want to limit what attributes are exposed
return getattr(self._obj, name)

@cached_property
def description(self):
"""Return description."""
if not self._obj.__doc__:
return ""
return self._obj.__doc__.split("\n")[0]

@cached_property
def parameters(self):
"""Return the task's parameters."""
# TODO: Make this result more user friendly or enhance with type information
return signature(self._obj).parameters

@classmethod
def all(cls):
"""Return all tasks."""
if getattr(cls, "_all_tasks", None) is None:
# Cache results
cls._all_tasks = [
cls(task)
for task in current_app.tasks.values()
# Filter outer Celery internal tasks
if not task.name.startswith("celery.")
]
return cls._all_tasks
2 changes: 2 additions & 0 deletions invenio_jobs/resources/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ class TasksResourceConfig(ResourceConfig, ConfiguratorMixin):
url_prefix = "/tasks"
routes = {"list": ""}

request_search_args = SearchRequestArgsSchema


class JobsSearchRequestArgsSchema(SearchRequestArgsSchema):
"""Jobs search request parameters."""
Expand Down
104 changes: 62 additions & 42 deletions invenio_jobs/services/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,33 +8,61 @@

"""Services config."""

from functools import partial

from invenio_i18n import gettext as _
from invenio_records_resources.services.base import ServiceConfig
from invenio_records_resources.services.base.config import ConfiguratorMixin
from invenio_records_resources.services.base.config import ConfiguratorMixin, FromConfig
from invenio_records_resources.services.records.config import (
SearchOptions as SearchOptionsBase,
)
from invenio_records_resources.services.records.links import pagination_links
from invenio_records_resources.services.records.results import RecordItem, RecordList

from ..models import Job, Run
from ..models import Job, Run, Task
from . import results
from .links import JobLink
from .permissions import JobPermissionPolicy, RunPermissionPolicy, TasksPermissionPolicy
from .schema import JobSchema
from .schema import JobSchema, TaskSchema


class TasksSearchOptions(SearchOptionsBase):
"""Tasks search options."""

sort_default = "name"
sort_direction_default = "asc"
sort_direction_options = {
"asc": dict(
title=_("Ascending"),
fn=partial(sorted, key=lambda t: t.name),
),
"desc": dict(
title=_("Descending"),
fn=partial(sorted, key=lambda t: t.name, reverse=True),
),
}
sort_options = {"name": dict(title=_("Name"), fields=["name"])}

pagination_options = {"default_results_per_page": 25}


class TasksServiceConfig(ServiceConfig, ConfiguratorMixin):
"""TaskService factory configuration."""

# Common configuration
service_id = "tasks"
permission_policy_cls = TasksPermissionPolicy

result_list_cls = RecordList
record_cls = Task
search = TasksSearchOptions
schema = TaskSchema

links_item = {
"self": JobLink("{+api}/tasks"),
}
permission_policy_cls = FromConfig(
"JOBS_TASKS_PERMISSION_POLICY",
default=TasksPermissionPolicy,
)

result_list_cls = results.List

links_item = None
links_search = pagination_links("{+api}/tasks{?args*}")


class JobSearchOptions(SearchOptionsBase):
Expand All @@ -46,28 +74,20 @@ class JobSearchOptions(SearchOptionsBase):
class JobsServiceConfig(ServiceConfig, ConfiguratorMixin):
"""Service factory configuration."""

# Common configuration
service_id = "jobs"
permission_policy_cls = JobPermissionPolicy

# TODO: See if we need to define custom Job result item and list classes
result_item_cls = RecordItem
result_list_cls = RecordList

# Record specific configuration
record_cls = Job

# TODO: See if these are needed since we don't index jobs
# indexer_cls = None
# indexer_queue_name = None
# index_dumper = None

# Search configuration
search = JobSearchOptions

# Service schema
schema = JobSchema

permission_policy_cls = FromConfig(
"JOBS_PERMISSION_POLICY",
default=JobPermissionPolicy,
)

result_item_cls = results.Item
result_list_cls = results.List

links_item = {
"self": JobLink("{+api}/jobs/{id}"),
"runs": JobLink("{+api}/jobs/{id}/runs"),
Expand All @@ -76,33 +96,33 @@ class JobsServiceConfig(ServiceConfig, ConfiguratorMixin):
links_search = pagination_links("{+api}/jobs{?args*}")


class RunSearchOptions(SearchOptionsBase):
"""Run search options."""

# TODO: See what we need to override


class RunsServiceConfig(ServiceConfig, ConfiguratorMixin):
"""Service factory configuration."""

# Common configuration
service_id = "runs"
permission_policy_cls = RunPermissionPolicy

# TODO: See if we need to define custom Job result item and list classes
result_item_cls = RecordItem
result_list_cls = RecordList

# Record specific configuration
record_cls = Run
search = RunSearchOptions
schema = JobSchema

# TODO: See if these are needed since we don't index jobs
# indexer_cls = None
# indexer_queue_name = None
# index_dumper = None
permission_policy_cls = FromConfig(
"JOBS_RUNS_PERMISSION_POLICY",
default=RunPermissionPolicy,
)

# Search configuration
search = JobSearchOptions

# Service schema
schema = JobSchema
result_item_cls = results.Item
result_list_cls = results.List

links_item = {
"self": JobLink("{+api}/jobs/{job_id}/runs/{run_id}"),
"stop": JobLink("{+api}/jobs/{job_id}/runs/{run_id}/actions/stop"),
"logs": JobLink("{+api}/jobs/{job_id}/runs/{run_id}/logs"),
}

links_search = pagination_links("{+api}/jobs/{job_id}{?args*}")
3 changes: 0 additions & 3 deletions invenio_jobs/services/permissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@ class TasksPermissionPolicy(BasePermissionPolicy):
"""Access control configuration for tasks."""

can_search = [Administration()]
can_create = [Administration()]
can_read = [Administration()]
can_update = [Administration()]
can_delete = [Administration()]


class JobPermissionPolicy(BasePermissionPolicy):
Expand Down
46 changes: 46 additions & 0 deletions invenio_jobs/services/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

"""Service schemas."""

import inspect

from invenio_i18n import lazy_gettext as _
from marshmallow import EXCLUDE, Schema, fields, validate
from marshmallow_utils.fields import SanitizedUnicode
Expand All @@ -25,6 +27,36 @@ def _not_blank(**kwargs):
)


class TaskParameterSchema(Schema):
"""Schema for a task parameter."""

name = SanitizedUnicode()

# TODO: Make custom schema for serializing parameter types
default = fields.Method("dump_default")
kind = fields.String()

def dump_default(self, obj):
"""Dump the default value."""
if obj.default in (None, inspect.Parameter.empty):
return None
elif isinstance(obj.default, (bool, int, float, str)):
return obj.default
else:
return str(obj.default)


class TaskSchema(Schema, FieldPermissionsMixin):
"""Schema for a task."""

name = SanitizedUnicode()
description = SanitizedUnicode()
parameters = fields.Dict(
keys=SanitizedUnicode(),
values=fields.Nested(TaskParameterSchema),
)


class JobSchema(Schema, FieldPermissionsMixin):
"""Base schema for a job."""

Expand All @@ -37,3 +69,17 @@ class Meta:

title = SanitizedUnicode(required=True, validate=_not_blank(max=250))
description = SanitizedUnicode()


class RunSchema(Schema, FieldPermissionsMixin):
"""Base schema for a job."""

class Meta:
"""Meta attributes for the schema."""

unknown = EXCLUDE

id = fields.UUID(dump_only=True)

title = SanitizedUnicode(required=True, validate=_not_blank(max=250))
description = SanitizedUnicode()
36 changes: 33 additions & 3 deletions invenio_jobs/services/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,46 @@

"""Service definitions."""

from invenio_records_resources.services.base import LinksTemplate
from invenio_records_resources.services.base.utils import map_search_params
from invenio_records_resources.services.records import RecordService
from invenio_records_resources.services.uow import unit_of_work

from ..models import Task


class TasksService(RecordService):
"""Tasks service."""

def search(self, identity, **kwargs):
"""Search for jobs."""
raise NotImplementedError()
def search(self, identity, params):
"""Search for tasks."""
self.require_permission(identity, "search")

# TODO: Use an API class
tasks = Task.all()

search_params = map_search_params(self.config.search, params)
query_param = search_params["q"]
if query_param:
tasks = [
task
for task in tasks
if (
query_param in task.name.lower()
or query_param in task.description.lower()
)
]
sort_direction = search_params["sort_direction"]
tasks = sort_direction(tasks)

return self.result_list(
service=self,
identity=identity,
results=tasks,
params=search_params,
links_tpl=LinksTemplate(self.config.links_search, context={"args": params}),
links_item_tpl=self.links_item_tpl,
)


class JobsService(RecordService):
Expand Down
Loading

0 comments on commit a2dd9a4

Please sign in to comment.