Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve CSW-based harvesting #184

Merged
merged 13 commits into from
Jun 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,5 @@ repos:
- types-setuptools==57.4.9
- types-python-dateutil==2.8.9
- types-python-slugify==5.0.3
- types-pyyaml==6.0.8
args: [--ignore-missing-imports]
4 changes: 4 additions & 0 deletions ckanext/dalrrd_emc_dcpr/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ class _CkanEmcDataset:
sasdi_theme: typing.Optional[str] = None
tags: typing.List[typing.Dict] = dataclasses.field(default_factory=list)
source: typing.Optional[str] = None
license_id: typing.Optional[str] = None
version: typing.Optional[str] = None
lineage: typing.Optional[str] = None
featured: typing.Optional[bool] = False

def to_data_dict(self) -> typing.Dict:
result = {}
Expand Down
40 changes: 40 additions & 0 deletions ckanext/dalrrd_emc_dcpr/cli/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import logging
import os
import sys
import time
import traceback
import typing
from concurrent import futures
Expand All @@ -23,6 +24,8 @@
from lxml import etree
from sqlalchemy import text as sla_text

from ckanext.harvest import utils as harvest_utils

from .. import provide_request_context
from ckanext.dalrrd_emc_dcpr.model.dcpr_request import (
DCPRRequest,
Expand Down Expand Up @@ -1143,3 +1146,40 @@ def drop_materialized_view():
sla_text(f"DROP MATERIALIZED VIEW {_PYCSW_MATERIALIZED_VIEW_NAME}")
)
logger.info("Done!")


@extra_commands.command()
@click.option(
"--post-run-delay-seconds",
help="How much time to sleep after performing the harvesting command",
default=(60 * 5),
)
@click.pass_context
def harvesting_dispatcher(ctx, post_run_delay_seconds: int):
"""Manages the harvesting queue and then sleeps a while after that.

This command takes care of submitting pending jubs and marking done jobs as finished.

It is similar to ckanext.harvest's `harvester run` CLI command, with the difference
being that this command is designed to run and then wait a specific amount of time
before exiting. This is a workaround for the fact that it is not possible to
specify a delay period when restarting docker containers in docker-compose's normal
mode.

NOTE: This command is not needed when running under k8s or docker-compose swarm
mode, as these offer other ways to control periodic services. In that case you can
simply configure the periodic service and then use

`launch-ckan-cli harvester run`

as the container's CMD instruction.

"""

flask_app = ctx.meta["flask_app"]
with flask_app.test_request_context():
logger.info(f"Calling harvester run command...")
harvest_utils.run_harvester()
logger.info(f"Sleeping for {post_run_delay_seconds!r} seconds...")
time.sleep(post_run_delay_seconds)
logger.info("Done!")
4 changes: 3 additions & 1 deletion ckanext/dalrrd_emc_dcpr/cli/docker_entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import os
import sys
import time
import traceback

import click

Expand Down Expand Up @@ -77,9 +78,10 @@ def _wait_for_ckan_env(
try:
load_environment(config)
except Exception as exc:
formatted_exc = traceback.format_exc()
click.secho(
f"({current_attempt}/{total_tries}) - ckan environment is not "
f"available yet: {str(exc)}",
f"available yet: {formatted_exc}",
fg="red",
)
click.secho(f"Retrying in {pause_for} seconds...")
Expand Down
10 changes: 6 additions & 4 deletions ckanext/dalrrd_emc_dcpr/logic/action/ckan.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def package_create(original_action, context, data_dict):
Intercepts the core `package_create` action to check if package
is being published after being created.
"""
return _package_publish_check(original_action, context, data_dict)
return _act_depending_on_package_visibility(original_action, context, data_dict)


@toolkit.chained_action
Expand All @@ -98,15 +98,15 @@ def package_update(original_action, context, data_dict):
Intercepts the core `package_update` action to check if package is being published.
"""
logger.debug(f"inside package_update action: {data_dict=}")
return _package_publish_check(original_action, context, data_dict)
return _act_depending_on_package_visibility(original_action, context, data_dict)


@toolkit.chained_action
def package_patch(original_action, context, data_dict):
"""
Intercepts the core `package_patch` action to check if package is being published.
"""
return _package_publish_check(original_action, context, data_dict)
return _act_depending_on_package_visibility(original_action, context, data_dict)


def user_patch(context: typing.Dict, data_dict: typing.Dict) -> typing.Dict:
Expand Down Expand Up @@ -138,7 +138,9 @@ def user_patch(context: typing.Dict, data_dict: typing.Dict) -> typing.Dict:
return update_action(context, patched)


def _package_publish_check(action, context, data):
def _act_depending_on_package_visibility(
action: typing.Callable, context: typing.Dict, data: typing.Dict
):
remains_private = toolkit.asbool(data.get("private", True))
if remains_private:
result = action(context, data)
Expand Down
70 changes: 35 additions & 35 deletions ckanext/dalrrd_emc_dcpr/logic/auth/ckan.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
import typing

import ckan.plugins.toolkit as toolkit
from ckan.logic.auth import get_package_object

from ckanext.harvest.utils import DATASET_TYPE_NAME as CKANEXT_HARVEST_DATASET_TYPE_NAME

logger = logging.getLogger(__name__)

Expand All @@ -16,45 +19,42 @@ def package_update(next_auth, context, data_dict=None):
"""

user = context["auth_user_obj"]
if data_dict is None:
if user.sysadmin:
final_result = next_auth(context, data_dict)
else:
package = toolkit.get_action("package_show")(
context=context, data_dict=data_dict
)
logger.debug(f"{package=}")
if user.sysadmin:
result = {"success": True}
elif package.get("private", False):
result = {"success": True}
elif package.get("state") == "draft":
result = {"success": True}
elif data_dict is not None:
# NOTE: we do not call toolkit.get_action("package_show") here but rather do it
# the same as vanilla CKAN which uses a custom way to retrieve the object from
# the context - this is in order to ensure other extensions
# (e.g. ckanext.harvest) are able to function correctly
package = get_package_object(context, data_dict)
if package.type == CKANEXT_HARVEST_DATASET_TYPE_NAME:
# defer auth to the ckanext.harvest extension
final_result = next_auth(context, data_dict)
else:
org_id = data_dict.get("owner_org", package.get("owner_org"))
if org_id is not None:
members_action = toolkit.get_action("member_list")
members = members_action(
data_dict={"id": org_id, "object_type": "user"}
)
for member_id, _, role in members:
if member_id == user.id and role.lower() == "admin":
result = {"success": True}
break
else:
org_name = package.get("organization", {}).get("name", "") or org_id
result = {
"success": False,
"msg": (
f"Only administrators of organization {org_name!r} are "
result = {"success": False}
if package.private or package.state == "draft":
result["success"] = True
else:
org_id = data_dict.get("owner_org", package.owner_org)
if org_id is not None:
members = toolkit.get_action("member_list")(
data_dict={"id": org_id, "object_type": "user"}
)
for member_id, _, role in members:
if member_id == user.id and role.lower() == "admin":
result["success"] = True
break
else:
result["msg"] = (
f"Only administrators of organization {org_id!r} are "
f"authorized to edit one of its public datasets"
),
}
)
if result["success"]:
final_result = next_auth(context, data_dict)
else:
result = {"success": False}
if result["success"]:
final_result = next_auth(context, data_dict)
else:
final_result = result
final_result = result
else:
final_result = next_auth(context, data_dict)
return final_result


Expand Down
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -15,29 +15,31 @@
from flask import Blueprint
from sqlalchemy import orm

from . import (
from ckanext.harvest.utils import DATASET_TYPE_NAME as HARVEST_DATASET_TYPE_NAME

from .. import (
constants,
helpers,
)
from .blueprints.dcpr import dcpr_blueprint
from .blueprints.emc import emc_blueprint
from .cli import commands
from .cli.legacy_sasdi import commands as legacy_sasdi_commands
from .logic.action import ckan as ckan_actions
from .logic.action.dcpr import create as dcpr_create_actions
from .logic.action.dcpr import delete as dcpr_delete_actions
from .logic.action.dcpr import get as dcpr_get_actions
from .logic.action.dcpr import update as dcpr_update_actions
from .logic.action import emc as emc_actions
from .logic import (
from ..blueprints.dcpr import dcpr_blueprint
from ..blueprints.emc import emc_blueprint
from ..cli import commands
from ..cli.legacy_sasdi import commands as legacy_sasdi_commands
from ..logic.action import ckan as ckan_actions
from ..logic.action.dcpr import create as dcpr_create_actions
from ..logic.action.dcpr import delete as dcpr_delete_actions
from ..logic.action.dcpr import get as dcpr_get_actions
from ..logic.action.dcpr import update as dcpr_update_actions
from ..logic.action import emc as emc_actions
from ..logic import (
converters,
validators,
)
from .logic.auth import ckan as ckan_auth
from .logic.auth import pages as ckanext_pages_auth
from .logic.auth import dcpr as dcpr_auth
from .logic.auth import emc as emc_auth
from .model.user_extra_fields import UserExtraFields
from ..logic.auth import ckan as ckan_auth
from ..logic.auth import pages as ckanext_pages_auth
from ..logic.auth import dcpr as dcpr_auth
from ..logic.auth import emc as emc_auth
from ..model.user_extra_fields import UserExtraFields

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -202,9 +204,9 @@ def read(self, entity):
return entity

def update_config(self, config_):
toolkit.add_template_directory(config_, "templates")
toolkit.add_public_directory(config_, "public")
toolkit.add_resource("assets", "ckanext-dalrrdemcdcpr")
toolkit.add_template_directory(config_, "../templates")
toolkit.add_public_directory(config_, "../public")
toolkit.add_resource("../assets", "ckanext-dalrrdemcdcpr")

def get_commands(self):
return [
Expand Down Expand Up @@ -339,13 +341,15 @@ def get_blueprint(self) -> typing.List[Blueprint]:
def dataset_facets(
self, facets_dict: typing.OrderedDict, package_type: str
) -> typing.OrderedDict:
facets_dict[f"vocab_{constants.SASDI_THEMES_VOCABULARY_NAME}"] = toolkit._(
"SASDI Theme"
)
facets_dict[f"vocab_{constants.ISO_TOPIC_CATEGOY_VOCABULARY_NAME}"] = toolkit._(
"ISO Topic Category"
)
facets_dict["reference_date"] = toolkit._("Reference Date")
if package_type != HARVEST_DATASET_TYPE_NAME:
facets_dict[f"vocab_{constants.SASDI_THEMES_VOCABULARY_NAME}"] = toolkit._(
"SASDI Theme"
)
facets_dict[
f"vocab_{constants.ISO_TOPIC_CATEGOY_VOCABULARY_NAME}"
] = toolkit._("ISO Topic Category")
facets_dict["reference_date"] = toolkit._("Reference Date")
facets_dict["harvest_source_title"] = toolkit._("Harvest source")
return facets_dict

def group_facets(
Expand Down
Loading