Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add geospatial search and update some comments #15

Open
wants to merge 54 commits into
base: d3m
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
77b0ec8
add geospatial search
yuqiuhe Sep 5, 2019
aff6ec9
update some comments
yuqiuhe Sep 5, 2019
fe4a5cb
Merge branch 'd3m' into yuqiuhe
yuqiuhe Sep 5, 2019
8b265cf
add geospatial_related
yuqiuhe Sep 6, 2019
bd3382a
change debug to info
yuqiuhe Sep 6, 2019
a8e221e
Merge branch 'yuqiuhe' of https://github.com/usc-isi-i2/datamart-user…
yuqiuhe Sep 6, 2019
ae89bcb
prevent empty from two points
yuqiuhe Sep 6, 2019
5ee81cc
change debug to info
yuqiuhe Sep 6, 2019
d937746
Merge branch 'd3m' of https://github.com/usc-isi-i2/datamart-userend …
yuqiuhe Sep 10, 2019
2845ca9
give augment timeout and save failed results as None
ckxz105 Sep 16, 2019
a4b822d
fix bug that failed augment will still run again
ckxz105 Sep 17, 2019
f817d2e
better response for fail augment
ckxz105 Sep 18, 2019
4332ba4
improve general searnch cache
ckxz105 Sep 18, 2019
d05d3b4
use cache in _run_wikifier of entries.py
ckxz105 Sep 18, 2019
ef6301b
change wikifier port
ckxz105 Sep 20, 2019
68d5e07
fix error if augment timeout
ckxz105 Sep 21, 2019
47843c9
add _search_geospatial_data
yuqiuhe Sep 23, 2019
cfb1ead
Merge branch 'd3m' of https://github.com/usc-isi-i2/datamart-userend …
yuqiuhe Sep 23, 2019
54dedee
Merge branch 'test_augment_change' of https://github.com/usc-isi-i2/d…
yuqiuhe Sep 23, 2019
aad4660
update for docker write address
ckxz105 Sep 23, 2019
ff0cd85
update search_geospatial_data function
yuqiuhe Sep 25, 2019
04c2a33
update wikidata vector server addreess
ckxz105 Sep 25, 2019
223c4ab
not augment for n-m condition now -- testing...
ckxz105 Sep 26, 2019
14c6f34
Merge pull request #16 from usc-isi-i2/yuqiuhe
ckxz105 Sep 26, 2019
bf4d2e0
better log for wikifier
ckxz105 Sep 26, 2019
77786c2
Merge branch 'test_augment_change' of https://github.com/usc-isi-i2/d…
ckxz105 Sep 26, 2019
af2d36f
better log for wikifier
ckxz105 Sep 26, 2019
52ca147
update check on n-m condition
ckxz105 Sep 27, 2019
e2e17db
fix cache bug in wikifier
ckxz105 Sep 27, 2019
a4f8af2
improve wikifier choice cache
ckxz105 Sep 27, 2019
63d9464
change duplicate name on wikifier
ckxz105 Sep 27, 2019
bd097c0
fix bug on basic_profiler
ckxz105 Sep 28, 2019
d0f9e93
update metadata generate method
ckxz105 Sep 30, 2019
938b8b7
fix semantic type bug for datetime
ckxz105 Sep 30, 2019
2f88353
typo
ckxz105 Sep 30, 2019
ff5eff5
fix bug on wikifier
ckxz105 Sep 30, 2019
788040c
Merge
Oct 4, 2019
cde8602
update service
Oct 4, 2019
9850e08
update redis and query cache service init
Oct 5, 2019
39e47f7
change wikifier cache to save on metadata_cache instead of /tmp, add …
ckxz105 Oct 9, 2019
e244140
sort key after augment
ckxz105 Oct 10, 2019
c838178
update service json
Oct 10, 2019
54de5f6
merge
Oct 10, 2019
bd24526
update host
ckxz105 Oct 10, 2019
fbeb8f2
add new example, fix bug on augment with vector
ckxz105 Oct 11, 2019
584a701
server side config
Oct 11, 2019
9d79bfa
server side service definitions
Oct 14, 2019
3a02639
clean up
Oct 14, 2019
96c58c2
add dataset materializer cache
ckxz105 Oct 15, 2019
80f75ef
make dirs
Oct 16, 2019
b4326ed
add blacklist
yuqiuhe Oct 21, 2019
3e070c5
resolve conflict
yuqiuhe Oct 21, 2019
b0bede2
remove blacklist
yuqiuhe Oct 21, 2019
46ef365
resolve the conflict
yuqiuhe Oct 21, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions datamart_isi/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
name = "datamart_isi"
from .entries import DatamartSearchResult, DatasetColumn
from pkgutil import extend_path
__path__ = extend_path(__path__, __name__)
58 changes: 55 additions & 3 deletions datamart_isi/augment.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
from datamart_isi.utilities import connection
from SPARQLWrapper import SPARQLWrapper, JSON, POST, URLENCODED
from itertools import chain
from datamart_isi.utilities.geospatial_related import GeospatialRelated
from datamart_isi.cache.wikidata_cache import QueryCache


class Augment(object):
Expand All @@ -28,6 +30,7 @@ def __init__(self) -> None:
self.qm.setRequestMethod(URLENCODED)
self.profiler = Profiler()
self.logger = logging.getLogger(__name__)
self.wikidata_cache_manager = QueryCache()

def query_by_sparql(self, query: dict, dataset: pd.DataFrame = None) -> typing.Optional[typing.List[dict]]:
"""
Expand All @@ -50,7 +53,7 @@ def query_by_sparql(self, query: dict, dataset: pd.DataFrame = None) -> typing.O
return []
return results
else:
print("\n\n[ERROR] No query given, query failed!\n\n")
self.logger.error("No query given, query failed!")
return []

def parse_sparql_query(self, json_query, dataset) -> str:
Expand Down Expand Up @@ -112,11 +115,11 @@ def parse_sparql_query(self, json_query, dataset) -> str:
if "variables_search" in json_query.keys() and json_query["variables_search"] != {}:
if "temporal_variable" in json_query["variables_search"].keys():
tv = json_query["variables_search"]["temporal_variable"]
TemporalGranularity = {'second': 14, 'minute': 13, 'hour': 12, 'day': 11, 'month': 10, 'year': 9}
temporal_granularity = {'second': 14, 'minute': 13, 'hour': 12, 'day': 11, 'month': 10, 'year': 9}

start_date = pd.to_datetime(tv["start"]).isoformat()
end_date = pd.to_datetime(tv["end"]).isoformat()
granularity = TemporalGranularity[tv["granularity"]]
granularity = temporal_granularity[tv["granularity"]]
spaqrl_query += '''
?variable pq:C2013 ?time_granularity .
?variable pq:C2011 ?start_time .
Expand All @@ -125,6 +128,21 @@ def parse_sparql_query(self, json_query, dataset) -> str:
FILTER(!((?start_time > "''' + end_date + '''"^^xsd:dateTime) || (?end_time < "''' + start_date + '''"^^xsd:dateTime)))
'''

if "geospatial_variable" in json_query["variables_search"].keys():
geo_variable = json_query["variables_search"]["geospatial_variable"]
qnodes = self.parse_geospatial_query(geo_variable)
if qnodes:
# find similar dataset from datamart
query_part = " ".join(qnodes)
# query_part = "q1494 q1400 q759 q1649 q1522 q1387 q16551" # COMMENT: for testing
spaqrl_query += '''
?variable pq:C2006 [
bds:search """''' + query_part + '''""" ;
bds:relevance ?score_geo ;
].
'''
bind = "?score_geo" if bind == "" else bind + "+ ?score_geo"

# if "title_search" in json_query.keys() and json_query["title_search"] != '':
# query_title = json_query["title_search"]
# spaqrl_query += '''
Expand All @@ -140,6 +158,40 @@ def parse_sparql_query(self, json_query, dataset) -> str:
spaqrl_query += "\n }" + "\n" + ORDER + "\n" + LIMIT

return spaqrl_query

def parse_geospatial_query(self, geo_variable):
geo_gra_dict = {'country': 'Q6256', 'state': 'Q7275', 'city': 'Q515', 'county': 'Q28575',
'postal_code': 'Q37447'}
qnodes = set()

# located inside a bounding box
if "latitude1" in geo_variable.keys() and "latitude2" in geo_variable.keys():
geo1_related = GeospatialRelated(float(geo_variable["latitude1"]), float(geo_variable["longitude1"]))
geo1_related.coordinate_transform() # axis transformation
geo2_related = GeospatialRelated(float(geo_variable["latitude2"]), float(geo_variable["longitude2"]))
geo2_related.coordinate_transform()
# find top left point and bottom right point
top_left_point, botm_right_point = geo1_related.distinguish_two_points(geo2_related)
granularity = geo_gra_dict[geo_variable["granularity"]]

if top_left_point and botm_right_point:
# get Q nodes located inside a geospatial bounding box from wikidata query
sparql_query = "select distinct ?place where \n{\n ?place wdt:P31/wdt:P279* wd:" + granularity + " .\n" \
+ "SERVICE wikibase:box {\n ?place wdt:P625 ?location .\n" \
+ "bd:serviceParam wikibase:cornerWest " + "\"Point(" + str(
top_left_point[0]) + " " + str(top_left_point[1]) + ")\"^^geo:wktLiteral .\n" \
+ "bd:serviceParam wikibase:cornerEast " + "\"Point(" + str(
botm_right_point[0]) + " " + str(botm_right_point[1]) + ")\"^^geo:wktLiteral .\n}\n" \
+ "SERVICE wikibase:label { bd:serviceParam wikibase:language \"en\" }\n}\n"
results = self.wikidata_cache_manager.get_result(sparql_query)
if results:
for each in results:
value = each["place"]["value"]
value = value.split('/')[-1]
qnodes.add(value)

return qnodes

#
# def query(self,
# col: pd.Series = None,
Expand Down
26 changes: 13 additions & 13 deletions datamart_isi/cache/general_search_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,19 @@ def add_to_memcache(self, supplied_dataframe, search_result_serialized, augment_
else:
raise ValueError("Unsupport type of supplied_data result as " + str(type(supplied_dataframe)) + "!")

path_to_augment_results = os.path.join(config.cache_file_storage_base_loc, hash_key + ".pkl")
# add supplied data for further updating if needed
try:
search_result_json = json.loads(search_result_serialized)
if "wikifier_choice" in search_result_json:
storage_loc = os.path.join(config.cache_file_storage_base_loc, "wikifier_cache")
else:
storage_loc = os.path.join(config.cache_file_storage_base_loc, "general_search_cache")
except:
storage_loc = os.path.join(config.cache_file_storage_base_loc, "other_cache")

path_to_supplied_dataframe = os.path.join(storage_loc, str(hash_supplied_dataframe) + ".pkl")
path_to_augment_results = os.path.join(storage_loc, hash_key + ".pkl")

with open(path_to_augment_results, "wb") as f:
pickle.dump(augment_results, f)

Expand All @@ -88,18 +100,6 @@ def add_to_memcache(self, supplied_dataframe, search_result_serialized, augment_
if not response_code2:
self._logger.warning("Pushing timestamp failed! What happened???")

# add supplied data for further updating if needed
if search_result_serialized:
try:
search_result_json = json.loads(search_result_serialized)
if "wikifier_choice" in search_result_json:
storage_loc = os.path.join(config.cache_file_storage_base_loc, "wikifier_cache")
else:
storage_loc = os.path.join(config.cache_file_storage_base_loc, "general_search_cache")
except:
storage_loc = os.path.join(config.cache_file_storage_base_loc, "other_cache")

path_to_supplied_dataframe = os.path.join(storage_loc, str(hash_supplied_dataframe) + ".pkl")
with open(path_to_supplied_dataframe, "wb") as f:
pickle.dump(supplied_dataframe, f)
response_code3 = self.mc.set("supplied_data" + hash_key, path_to_supplied_dataframe)
Expand Down
141 changes: 120 additions & 21 deletions datamart_isi/cache/metadata_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,72 @@
import os
import logging
import json
import pandas as pd

from d3m.container.dataset import D3MDatasetLoader
from d3m.container import Dataset as d3m_Dataset
from d3m.metadata.base import ALL_ELEMENTS
from d3m.base import utils as d3m_utils
from datamart_isi.config import cache_file_storage_base_loc
from datamart_isi.config import default_temp_path

DEFAULT_TEMP_PATH = default_temp_path
_logger = logging.getLogger(__name__)
seed_dataset_store_location = os.path.join(cache_file_storage_base_loc, "datasets_cache")
wikifier_target_cache_exist_mark = "wikifier_target_cache_exist_mark"
if not os.path.exists(seed_dataset_store_location):
print(f'Creating directory: {seed_dataset_store_location}')
os.makedirs(seed_dataset_store_location, exist_ok=True)


class MetadataCache:
@staticmethod
def get_hash_key(input_data: pd.DataFrame) -> str:
"""
Function used to get the hash key for dataset cache
:param input_data:
:return: the hash key of the input data
"""
data_columns_list = input_data.columns.tolist()
data_columns_list.sort()
hash_generator = hashlib.md5()

hash_generator.update(str(data_columns_list).encode('utf-8'))
hash_key = str(hash_generator.hexdigest())
_logger.debug("Current columns are: " + str(data_columns_list))
_logger.debug("Current dataset's hash key is: " + hash_key)
return hash_key

@staticmethod
def save_specific_wikifier_targets(current_dataframe, column_to_p_node_dict,
cache_folder: str = seed_dataset_store_location) -> bool:
hash_key = MetadataCache.get_hash_key(current_dataframe)
# delete previous exist wikification target first
MetadataCache.delete_specific_p_nodes_file(current_dataframe)
file_loc = os.path.join(cache_folder, hash_key + "_metadata")
if os.path.exists(file_loc):
with open(file_loc, "r") as f:
current_dataset_metadata_dict = json.load(f)
else:
current_dataset_metadata_dict = dict()
try:
# add wikifier targets to file
current_dataset_metadata_dict[wikifier_target_cache_exist_mark] = True
for i in range(current_dataframe.shape[1]):
current_column_name = current_dataframe.columns[i]
if current_column_name in column_to_p_node_dict:
current_dataset_metadata_dict[current_column_name + "_wikifier_target"] = column_to_p_node_dict[current_column_name]

with open(file_loc, "w") as f:
json.dump(current_dataset_metadata_dict, f)
_logger.info("Saving wikifier targets to " + file_loc + " success!")
return True

except Exception as e:
_logger.error("Saving dataset failed!")
_logger.debug(e, exc_info=True)
return False

@staticmethod
def check_and_get_dataset_real_metadata(input_dataset: d3m_Dataset, cache_folder: str = seed_dataset_store_location):
"""
Expand All @@ -28,25 +82,27 @@ def check_and_get_dataset_real_metadata(input_dataset: d3m_Dataset, cache_folder
res_id, input_dataframe = d3m_utils.get_tabular_resource(dataset=input_dataset, resource_id=None)
input_columns = input_dataframe.columns.tolist()
input_columns.sort()
hash_generator = hashlib.md5()
hash_generator.update(str(input_columns).encode('utf-8'))
hash_key = str(hash_generator.hexdigest())
hash_key = MetadataCache.get_hash_key(input_dataframe)
_logger.debug("Current columns are: " + str(input_columns))
_logger.debug("Current dataset's hash key is: " + hash_key)
try:
file_loc = os.path.join(cache_folder, hash_key + "_metadata")
if os.path.exists(file_loc):
_logger.info("found exist metadata from seed datasets! Will use that")

with open(file_loc, "r") as f:
metadata_info = json.load(f)
_logger.info("The hit dataset id is: " + metadata_info["dataset_id"])

for i in range(len(input_columns)):
selector = (res_id, ALL_ELEMENTS, i)
current_column_name = input_dataframe.columns[i]
new_semantic_type = metadata_info[current_column_name]
input_dataset.metadata = input_dataset.metadata.update(selector, {"semantic_types": new_semantic_type})
return True, input_dataset
if "dataset_id" in metadata_info:
_logger.info("found exist metadata! Will use that")
_logger.info("The hit dataset id is: " + metadata_info["dataset_id"])
for i in range(len(input_columns)):
selector = (res_id, ALL_ELEMENTS, i)
current_column_name = input_dataframe.columns[i]
new_semantic_type = metadata_info[current_column_name]
input_dataset.metadata = input_dataset.metadata.update(selector, {"semantic_types": new_semantic_type})
return True, input_dataset
else:
_logger.info("Found file but the file do not contains metadata information for columns")
return False, input_dataset
else:
_logger.warning("No exist metadata from seed datasets found!")
return False, input_dataset
Expand Down Expand Up @@ -92,20 +148,19 @@ def save_metadata_from_dataset(current_dataset: d3m_Dataset, cache_folder: str =
:return: a Bool indicate saving success or not
"""
try:
current_dataset_metadata_dict = dict()
res_id, current_dataframe = d3m_utils.get_tabular_resource(dataset=current_dataset, resource_id=None)
hash_key = MetadataCache.get_hash_key(current_dataframe)
file_loc = os.path.join(cache_folder, hash_key + "_metadata")
if os.path.exists(file_loc):
with open(file_loc, "r") as f:
current_dataset_metadata_dict = json.load(f)
else:
current_dataset_metadata_dict = dict()

current_dataset_metadata_dict["dataset_id"] = current_dataset.metadata.query(())['id']
for i in range(current_dataframe.shape[1]):
each_metadata = current_dataset.metadata.query((res_id, ALL_ELEMENTS, i))
current_dataset_metadata_dict[current_dataframe.columns[i]] = each_metadata['semantic_types']
input_columns = current_dataframe.columns.tolist()
input_columns.sort()
hash_generator = hashlib.md5()
hash_generator.update(str(input_columns).encode('utf-8'))
hash_key = str(hash_generator.hexdigest())
_logger.debug("Current columns are: " + str(input_columns))
_logger.debug("Current dataset's hash key is: " + hash_key)
file_loc = os.path.join(cache_folder, hash_key + "_metadata")
with open(file_loc, "w") as f:
json.dump(current_dataset_metadata_dict, f)
_logger.info("Saving " + current_dataset_metadata_dict["dataset_id"] + " to " + file_loc + " success!")
Expand All @@ -115,3 +170,47 @@ def save_metadata_from_dataset(current_dataset: d3m_Dataset, cache_folder: str =
_logger.error("Saving dataset failed!")
_logger.debug(e, exc_info=True)
return False

@staticmethod
def generate_specific_meta_path(supplied_dataframe, cache_folder: str = seed_dataset_store_location):
hash_key = MetadataCache.get_hash_key(supplied_dataframe)
file_loc = os.path.join(cache_folder, hash_key + "_metadata")
return file_loc

@staticmethod
def get_specific_p_nodes(supplied_dataframe) -> typing.Optional[dict]:
specific_q_nodes_file = MetadataCache.generate_specific_meta_path(supplied_dataframe)
if os.path.exists(specific_q_nodes_file):
with open(specific_q_nodes_file, 'r') as f:
loaded_metadata = json.load(f)
specific_p_nodes_dict = dict()
# if no mark exist, it means this dataset's wikifier cache not saved, so we should return None
if wikifier_target_cache_exist_mark not in loaded_metadata:
return None
# otherwise, find corresponding wikifier targets
# it is possible to return an empty dict to indicate that no columns can be wikified
for key, value in loaded_metadata.items():
if key.endswith("_wikifier_target"):
column_name = key[:-16]
specific_p_nodes_dict[column_name] = value
return specific_p_nodes_dict
else:
return None

@staticmethod
def delete_specific_p_nodes_file(supplied_dataframe):
specific_q_nodes_file = MetadataCache.generate_specific_meta_path(supplied_dataframe)
if os.path.exists(specific_q_nodes_file):
with open(specific_q_nodes_file, "r") as f:
loaded_metadata = json.load(f)
keys_need_to_remove = []
for key in loaded_metadata.keys():
if key.endswith("_wikifier_target") or key == wikifier_target_cache_exist_mark:
keys_need_to_remove.append(key)
_logger.debug("Following specific wikifier targets will be removed:" + str(keys_need_to_remove))
for each_key in keys_need_to_remove:
loaded_metadata.pop(each_key)

with open(specific_q_nodes_file, "w") as f:
json.dump(loaded_metadata, f)
_logger.info("Delete specific p node files on {} success!".format(specific_q_nodes_file))
14 changes: 7 additions & 7 deletions datamart_isi/config.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
from . import config_services

import os
import socket

host_name = socket.gethostname()

if host_name == "dsbox02":
home_dir = "/data00/dsbox/datamart"
else:
home_dir = os.getenv("HOME")
home_dir = os.getenv("HOME")
# in the case that no correct home dir found (e.g. in docker)
if home_dir == "/":
home_dir = "/tmp"


default_datamart_url = config_services.get_default_datamart_url()
Expand Down Expand Up @@ -47,6 +44,7 @@

# elastic search to fetch FB embeddings
wikidata_uri_template = '<http://www.wikidata.org/entity/{}>'
# em_es_url = "http://kg2018a.isi.edu:9200"
# em_es_url = "http://sitaware.isi.edu:9200"
# em_es_index = "wiki_fb_embeddings_1"
# em_es_type = "vectors"
Expand All @@ -60,3 +58,5 @@
min_longitude_val = -180
max_latitude_val = 90
min_latitude_val = -90

maximum_accept_wikifier_size = 2000000
Loading