diff --git a/udata/core/dataset/rdf.py b/udata/core/dataset/rdf.py index e9c53961bd..3bb900dc62 100644 --- a/udata/core/dataset/rdf.py +++ b/udata/core/dataset/rdf.py @@ -75,17 +75,6 @@ EUFREQ.NEVER: 'punctual', } -# Map High Value Datasets URIs to keyword categories -EU_HVD_CATEGORIES = { - "http://data.europa.eu/bna/c_164e0bf5": "Météorologiques", - "http://data.europa.eu/bna/c_a9135398": "Entreprises et propriété d'entreprises", - "http://data.europa.eu/bna/c_ac64a52d": "Géospatiales", - "http://data.europa.eu/bna/c_b79e35eb": "Mobilité", - "http://data.europa.eu/bna/c_dd313021": "Observation de la terre et environnement", - "http://data.europa.eu/bna/c_e1da4e07": "Statistiques" -} - - def temporal_to_rdf(daterange, graph=None): if not daterange: return diff --git a/udata/harvest/backends/base.py b/udata/harvest/backends/base.py index 60eb5447a7..f57f792d90 100644 --- a/udata/harvest/backends/base.py +++ b/udata/harvest/backends/base.py @@ -8,6 +8,7 @@ import requests from flask import current_app +from udata.core.dataservices.models import Dataservice from voluptuous import MultipleInvalid, RequiredFieldInvalid from udata.core.dataset.models import HarvestDatasetMetadata @@ -153,6 +154,9 @@ def inner_harvest(self): def inner_process_dataset(self, item: HarvestItem) -> Dataset: raise NotImplementedError + def inner_process_dataservice(self, item: HarvestItem) -> Dataservice: + raise NotImplementedError + def harvest(self): log.debug(f'Starting harvesting {self.source.name} ({self.source.url})…') factory = HarvestJob if self.dryrun else HarvestJob.objects.create @@ -238,6 +242,52 @@ def is_done(self) -> bool: '''Should be called after process_dataset to know if we reach the max items''' return self.max_items and len(self.job.items) >= self.max_items + def process_dataservice(self, remote_id: str, **kwargs) -> bool : + ''' + Return `True` if the parent should stop iterating because we exceed the number + of items to process. + ''' + log.debug(f'Processing dataservice {remote_id}…') + + # TODO add `type` to `HarvestItem` to differentiate `Dataset` from `Dataservice` + item = HarvestItem(status='started', started=datetime.utcnow(), remote_id=remote_id) + self.job.items.append(item) + self.save_job() + + try: + dataservice = self.inner_process_dataservice(item, **kwargs) + + dataservice.harvest = self.update_harvest_info(dataservice.harvest, remote_id) + dataservice.archived = None + + # TODO: Apply editable mappings + + if self.dryrun: + dataservice.validate() + else: + dataservice.save() + item.dataservice = dataservice + item.status = 'done' + except HarvestSkipException as e: + item.status = 'skipped' + + log.info(f'Skipped item {item.remote_id} : {safe_unicode(e)}') + item.errors.append(HarvestError(message=safe_unicode(e))) + except HarvestValidationError as e: + item.status = 'failed' + + log.info(f'Error validating item {item.remote_id} : {safe_unicode(e)}') + item.errors.append(HarvestError(message=safe_unicode(e))) + except Exception as e: + item.status = 'failed' + log.exception(f'Error while processing {item.remote_id} : {safe_unicode(e)}') + + error = HarvestError(message=safe_unicode(e), details=traceback.format_exc()) + item.errors.append(error) + finally: + item.ended = datetime.utcnow() + self.save_job() + def update_harvest_info(self, harvest: Optional[HarvestDatasetMetadata], remote_id: int): if not harvest: harvest = HarvestDatasetMetadata() @@ -313,6 +363,50 @@ def get_dataset(self, remote_id): return Dataset(owner=self.source.owner) return Dataset() + + def get_dataservice(self, remote_id): + '''Get or create a dataservice given its remote ID (and its source) + We first try to match `source_id` to be source domain independent + ''' + dataservice = Dataservice.objects(__raw__={ + 'harvest.remote_id': remote_id, + '$or': [ + {'harvest.domain': self.source.domain}, + {'harvest.source_id': str(self.source.id)}, + ], + }).first() + + if dataservice: + return dataservice + + if self.source.organization: + return Dataservice(organization=self.source.organization) + elif self.source.owner: + return Dataservice(owner=self.source.owner) + + return Dataservice() + + def get_dataservice(self, remote_id): + '''Get or create a dataservice given its remote ID (and its source) + We first try to match `source_id` to be source domain independent + ''' + dataservice = Dataservice.objects(__raw__={ + 'harvest.remote_id': remote_id, + '$or': [ + {'harvest.domain': self.source.domain}, + {'harvest.source_id': str(self.source.id)}, + ], + }).first() + + if dataservice: + return dataservice + + if self.source.organization: + return Dataservice(organization=self.source.organization) + elif self.source.owner: + return Dataservice(owner=self.source.owner) + + return Dataservice() def validate(self, data, schema): '''Perform a data validation against a given schema. @@ -353,4 +447,4 @@ def validate(self, data, schema): msg = str(error) errors.append(msg) msg = '\n- '.join(['Validation error:'] + errors) - raise HarvestValidationError(msg) \ No newline at end of file + raise HarvestValidationError(msg) diff --git a/udata/harvest/backends/dcat.py b/udata/harvest/backends/dcat.py index 2f4161c1d3..bd2b29651f 100644 --- a/udata/harvest/backends/dcat.py +++ b/udata/harvest/backends/dcat.py @@ -1,19 +1,17 @@ import logging -from rdflib import Graph, URIRef +from rdflib import Graph from rdflib.namespace import RDF import lxml.etree as ET -import boto3 from flask import current_app from datetime import date -import json -from typing import Generator, List +from typing import Generator -from udata.core.dataset.models import Dataset from udata.rdf import ( DCAT, DCT, HYDRA, SPDX, namespace_manager, guess_format, url_from_rdf ) from udata.core.dataset.rdf import dataset_from_rdf +from udata.core.dataservices.rdf import dataservice_from_rdf from udata.storage.s3 import store_as_json, get_from_json from udata.harvest.models import HarvestItem @@ -71,7 +69,8 @@ def inner_harvest(self): self.process_one_datasets_page(page_number, page) serialized_graphs.append(page.serialize(format=fmt, indent=None)) - # TODO call `walk_graph` with `process_dataservices` + for page_number, page in self.walk_graph(self.source.url, fmt): + self.process_one_dataservices_page(page_number, page) # The official MongoDB document size in 16MB. The default value here is 15MB to account for other fields in the document (and for difference between * 1024 vs * 1000). max_harvest_graph_size_in_mongo = current_app.config.get('HARVEST_MAX_CATALOG_SIZE_IN_MONGO') @@ -148,6 +147,14 @@ def process_one_datasets_page(self, page_number: int, page: Graph): if self.is_done(): return + + def process_one_dataservices_page(self, page_number: int, page: Graph): + for node in page.subjects(RDF.type, DCAT.DataService): + remote_id = page.value(node, DCT.identifier) + self.process_dataservice(remote_id, page_number=page_number, page=page, node=node) + + if self.is_done(): + return def inner_process_dataset(self, item: HarvestItem, page_number: int, page: Graph, node): item.kwargs['page_number'] = page_number @@ -155,6 +162,12 @@ def inner_process_dataset(self, item: HarvestItem, page_number: int, page: Graph dataset = self.get_dataset(item.remote_id) return dataset_from_rdf(page, dataset, node=node) + def inner_process_dataservice(self, item: HarvestItem, page_number: int, page: Graph, node): + item.kwargs['page_number'] = page_number + + dataservice = self.get_dataservice(item.remote_id) + return dataservice_from_rdf(page, dataservice, node=node) + def get_node_from_item(self, graph, item): for node in graph.subjects(RDF.type, DCAT.Dataset): if str(graph.value(node, DCT.identifier)) == item.remote_id: diff --git a/udata/harvest/models.py b/udata/harvest/models.py index 032f5f09de..e1676e4b92 100644 --- a/udata/harvest/models.py +++ b/udata/harvest/models.py @@ -3,6 +3,7 @@ import logging from urllib.parse import urlparse +from udata.core.dataservices.models import Dataservice from werkzeug.utils import cached_property from udata.core.dataset.models import HarvestDatasetMetadata @@ -53,6 +54,7 @@ class HarvestError(db.EmbeddedDocument): class HarvestItem(db.EmbeddedDocument): remote_id = db.StringField() dataset = db.ReferenceField(Dataset) + dataservice = db.ReferenceField(Dataservice) status = db.StringField(choices=list(HARVEST_ITEM_STATUS), default=DEFAULT_HARVEST_ITEM_STATUS, required=True) created = db.DateTimeField(default=datetime.utcnow, required=True) diff --git a/udata/rdf.py b/udata/rdf.py index d0096115f4..9fa38f87e8 100644 --- a/udata/rdf.py +++ b/udata/rdf.py @@ -5,7 +5,7 @@ import logging import re -from flask import request, url_for, abort +from flask import request, url_for, abort, current_app from rdflib import Graph, Literal, URIRef from rdflib.resource import Resource as RdfResource @@ -101,6 +101,15 @@ # Includes control characters, unicode surrogate characters and unicode end-of-plane non-characters ILLEGAL_XML_CHARS = '[\x00-\x08\x0b\x0c\x0e-\x1F\uD800-\uDFFF\uFFFE\uFFFF]' +# Map High Value Datasets URIs to keyword categories +EU_HVD_CATEGORIES = { + "http://data.europa.eu/bna/c_164e0bf5": "Météorologiques", + "http://data.europa.eu/bna/c_a9135398": "Entreprises et propriété d'entreprises", + "http://data.europa.eu/bna/c_ac64a52d": "Géospatiales", + "http://data.europa.eu/bna/c_b79e35eb": "Mobilité", + "http://data.europa.eu/bna/c_dd313021": "Observation de la terre et environnement", + "http://data.europa.eu/bna/c_e1da4e07": "Statistiques" +} def guess_format(string): '''Guess format given an extension or a mime-type'''