diff --git a/docs/fact.slowdata.rst b/docs/fact.slowdata.rst deleted file mode 100644 index 654edc7..0000000 --- a/docs/fact.slowdata.rst +++ /dev/null @@ -1,36 +0,0 @@ -fact.slowdata package -===================== - -.. automodule:: fact.slowdata - :members: - :undoc-members: - :show-inheritance: - -Submodules ----------- - -fact.slowdata.correlation module --------------------------------- - -.. automodule:: fact.slowdata.correlation - :members: - :undoc-members: - :show-inheritance: - -fact.slowdata.settings module ------------------------------ - -.. automodule:: fact.slowdata.settings - :members: - :undoc-members: - :show-inheritance: - -fact.slowdata.tools module --------------------------- - -.. automodule:: fact.slowdata.tools - :members: - :undoc-members: - :show-inheritance: - - diff --git a/examples/fill_in.py b/examples/fill_in.py deleted file mode 100644 index ef674bc..0000000 --- a/examples/fill_in.py +++ /dev/null @@ -1,278 +0,0 @@ -"""Slow Data into Mongo DB Filler. - -Usage: - fill_in.py [--help] - fill_in.py [--delete_all] [options] - -Options: - -h --help Show this screen. - --delete_all delete all collections prior to filling [default: False]. - --base PATH base path to slow files [default: /data/fact_aux] - -""" -from docopt import docopt -PROGRAM_OPTIONS = docopt(__doc__, version='Filler 1') -import os -import re -import pyfact -from glob import glob -import time -import pymongo -import settings -import fact - - -def get_mongo_db_collection_by_name(database, collection_name): - """ creates collection if it does not yet exist. - if it exists, it simply returns it. - """ - return getattr(database, collection_name) - - -def delete_all_collections_from(database, omit=('system.indexes')): - """ delete all collections from a database - - if name of collection is in omit (default:['system.indexes']), - then this collection is not deleted. - """ - for coll_name in database.collection_names(): - if coll_name in omit: - continue - - coll = get_mongo_db_collection_by_name(database, coll_name) - try: - coll.drop() - except pymongo.errors.OperationFailure as exeption: - print("Was not able to drop collection {}\n" - "Exception Details:\n" - "args:{ex.args} | details:{ex.details} " - "| message:{ex.message} | code:{ex.code}" - ).format(coll_name, ex=exeption) - - -class Filler(object): - database_name = settings.database_name - service_header_DB = "aux_meta" - - def __init__(self, aux_file, connection): - self.aux_file = aux_file - self.path = self.aux_file.path - try: - self.fits_file = pyfact.Fits(self.path) - except TypeError: - # some fits files are apparently broken, we don't - # freak out about this, but report it nicely and go on. - self.fits_file = None - - self._connection = connection - self.aux = getattr(connection, self.database_name) - self.aux_meta = getattr(self._connection, self.service_header_DB) - - def fill_in(self): - """ main public method of a Filler - it does the actual job of filling the data from the fits file into the DB - """ - - if not self.fits_file is None: - starttime = time.time() - self.__insert_service_description(self.aux_meta) - self.__insert_fitsfile(self.aux) - self.__make_time_index_for_service(self.aux) - print self.__get_report(starttime) - else: - print "TypeError", self.path - - def __make_time_index_for_service(self, database): - """ ensure that Time index exists - - This simply ensures, that for the collection associated with the dim service, - which is stored inside the given file, there is indeed an index built for the 'Time' field. - - So in almost all cases, when this method is called nothing will happen, because the index is already there. - - This method only triggers an action on the DB side, when there is a new service being filled in, - or when for some reason an index was dropped. - """ - service_name = self.aux_file.service_name - coll = get_mongo_db_collection_by_name(database, service_name) - coll.ensure_index([("Time", 1)], unique=True, dropDups=True) - - def __bulk_insert_in_collection(self, collection): - """ insert contents of fits_file into collection use bulk insert - """ - bulk = collection.initialize_ordered_bulk_op() - for _ in self.fits_file: - doc = self.__create_mongo_doc() - bulk.insert(doc) - try: - bulk.execute() - except pymongo.errors.BulkWriteError as bwe: - code = bwe.details['writeErrors'][0]['code'] - if not code == 11000: - raise bwe - - # DERPECATED: nobody is using this one, because its so slow. - def __slow_insert_in_collection(self, coll): - """ insert contents of fits_file into collection, SLOW! - """ - for _ in self.fits_file: - doc = self.__create_mongo_doc() - coll.insert(doc) - - def __create_mongo_doc(self): - """ create a document suitable for mongo DB insertion from fits_file - """ - doc = dict() - for field_name in self.fits_file: - cell_content = self.fits_file[field_name] - - # mongo document fields can contain lists, or scalars, - # no numpy arrays! - # fits files only contain 1D numpy arrays - # (sometimes with only 1 element) - # so here I convert them to lists, - # and in the 1 element case, to scalars. - if len(cell_content) > 1: - doc[field_name] = cell_content.tolist() - elif len(cell_content) == 1: - doc[field_name] = float(cell_content[0]) - elif len(cell_content) == 0: - # pass if we find an empty numpy array, we ignore it. - pass - else: - # negative length can't happen, so - pass - - return doc - - def __insert_fitsfile(self, database): - """ insert data from fits file into the database - """ - fits_start_time = (self.fits_file.header['TSTARTI'] - + self.fits_file.header['TSTARTF']) - fits_stop_time = (self.fits_file.header['TSTOPI'] - + self.fits_file.header['TSTOPF']) - collection = get_mongo_db_collection_by_name(database, - self.aux_file.service_name) - - start_found = False - stop_found = False - if not collection.find_one({"Time": fits_start_time}) is None: - start_found = True - if not collection.find_one({"Time": fits_stop_time}) is None: - stop_found = True - if start_found and stop_found: - return - - self.__bulk_insert_in_collection(collection) - - def __insert_service_description(self, database): - """ insert fits header information into database - """ - service_name = self.aux_file.service_name - coll = database.service_descriptions - - # only insert this, if it has never been inserted before - if coll.find_one({'_id': service_name}) is not None: - return - - # document should contain the header information from the fits file. - header = self.fits_file.header - comments = self.fits_file.header_comments - doc = {} - for k in header: - if k in comments: - doc[k] = [header[k], comments[k]] - else: - doc[k] = header[k] - doc['_id'] = service_name - coll.insert(doc) - - def __get_report(self, starttime): - """ creates a textual report about the time - """ - fits_file_path = self.path - fits_file = pyfact.Fits(fits_file_path) - report = ("{time_str} : {date.year}/{date.month}/{date.day}:" - " {svc_name:.<40} : {len:6d} : {dura:2.3f}".format( - date=self.aux_file.date, - svc_name=self.aux_file.service_name, - dura=time.time() - starttime, - len=fits_file.header["NAXIS2"], - time_str=time.strftime("%Y/%m/%d-%H:%M:%S") - )) - - return report - - -class AuxFile(object): - """ can tell: - it's servicename, - its date and - if it is_interesting at all. - """ - cwd = os.path.dirname(os.path.realpath(__file__)) - whitelist = open(cwd + "/service_whitelist.txt").read().splitlines() - - def __init__(self, path): - self.path = path - - @property - def service_name(self): - """Grab the service name from a fits_file_path - - path : should be a full file path - - service name is defines as any number of characters - A-Z (or underscore _), which follows the 8-digit date - any file extension like ".fits", ".fits.gz" - even no file extension is permitted. - """ - filename = os.path.split(self.path)[1] - match = re.match(r"^\d{8}\.([A-Z_]+)", filename) - return match.group(1) - - @property - def date(self): - """ return datetime.datetime object (UTC aware) - """ - filename = os.path.split(self.path)[1] - match = re.match(r"^(\d{8})\.([A-Z_]+)", filename) - return fact.run2dt(match.group(1)) - - @property - def is_interesting(self): - """ check if service_name of file is in whitelist - """ - return self.service_name in self.whitelist - - -def interesting_files_under(base): - """ generate list of .fits files below this basepath - """ - search_path = os.path.join(base, '*/*/*/*.fits') - all_paths = glob(search_path) - - sorted_paths = sorted(all_paths, reverse=True) - - for path in sorted_paths: - aux_file = AuxFile(path) - if aux_file.is_interesting: - yield aux_file - - -def main(opts): - """ main function of this filler - """ - connection = pymongo.MongoClient(settings.host, settings.port) - - if opts['--delete_all']: - aux = getattr(connection, settings.database_name) - delete_all_collections_from(aux) - - for aux_file in interesting_files_under(opts['--base']): - filler = Filler(aux_file, connection) - filler.fill_in() - -if __name__ == "__main__": - main(PROGRAM_OPTIONS) diff --git a/examples/slowdata_db.py b/examples/slowdata_db.py deleted file mode 100644 index 4859f02..0000000 --- a/examples/slowdata_db.py +++ /dev/null @@ -1,38 +0,0 @@ -from __future__ import print_function -import sys -import matplotlib.pyplot as plt -from fact.time import iso2dt, fjd, OFFSET -from fact.slowdata import connect -import pymongo - -start_time = fjd(iso2dt("01.04.2013 12:34:56.789")) -stop_time = fjd(iso2dt("03.04.2013")) - -print("connecting to FACT slowdata DB ...") -try: - db = connect( - host='localhost', - port=37017, - db_name='aux') -except pymongo.errors.ConnectionFailure as e: - print("... was not able to connect to DB. maybe the tunnel is not open?.") - print("\n\n") - sys.exit(-1) - - -print("requesting some data from the slowdata DB, might take a while...") -interesting_data = db.MAGIC_WEATHER_DATA.from_until(start_time, stop_time) - -# interesting_data is a numpy structured array -print("dtype of interesting_data {}".format(interesting_data.dtype)) - -plt.figure() -plt.plot_date(interesting_data['Time'] + OFFSET, - interesting_data['T'], '.', label="outside temperature") -plt.plot_date(interesting_data['Time'] + OFFSET, - interesting_data['T_dew'], '.', label="dewpoint") -plt.ylabel("Temperature [deg C]") -plt.xlabel("Date") -plt.title("Some interesting_data") -plt.legend() -plt.show() diff --git a/fact/VERSION b/fact/VERSION index 166c9e2..4e8f395 100644 --- a/fact/VERSION +++ b/fact/VERSION @@ -1 +1 @@ -0.25.2 +0.26.0 diff --git a/fact/slowdata/__init__.py b/fact/slowdata/__init__.py deleted file mode 100644 index 1617ee5..0000000 --- a/fact/slowdata/__init__.py +++ /dev/null @@ -1,119 +0,0 @@ -''' FACT slow database frontent - -provided for your convenience. -''' -import numpy as np -from pymongo import MongoClient -from . import settings -from . import tools -from . import correlation - - -__all__ = ['connect', 'correlation'] - - -class ServiceField(object): - ''' represents one Field of a Service - ''' - - def __init__(self, collection, key): - self.__collection = collection - self.__key = key - - def from_until(self, start, end): - ''' retrieve field from start to end date as numpy array. - ''' - cursor = self.__collection.find({'Time': {'$gte': start, '$lt': end}}) - return tools.cursor_to_structured_array(cursor) - - -class Service(object): - ''' represents a FACT slow data service - ''' - - def __init__(self, collection): - self.__collection = collection - self.__get_service_field_names(collection) - for key in self.__keys: - setattr( - self, - key, - ServiceField(collection, key)) - - self.__keys = None - - def __get_service_field_names(self, coll): - ''' retrieve the field names of this service - - by using one example decument. - ''' - self.__keys = coll.find_one() - del self.__keys['_id'] - - def from_until(self, start, end, sample_period=None, skip=None, fields=None): - ''' retrieve service from start to end date as numpy array. - - start : starttime in FJD - end : end time in FJD - - keyword arguments: - - sample_period : time(in days) into which the request should be splitted --> skip - skip : number of periods, which should be skipped during request - fields : list of field names to be requested [default: all fields] - ''' - if sample_period is not None: - sample_boundaries = np.arange(start, end, sample_period) - else: - sample_boundaries = np.array([start, end]) - - samples_start = sample_boundaries[0:-1:skip] - samples_end = sample_boundaries[1::skip] - dict_list = [] - for _start, _end in zip(samples_start, samples_end): - start_stop_dict = {'Time': { - '$gte': float(_start), - '$lt': float(_end)} - } - dict_list.append(start_stop_dict) - - if fields is not None: - cursor = self.__collection.find({'$or': dict_list}, fields=fields) - else: - cursor = self.__collection.find({'$or': dict_list}) - - return tools.cursor_to_structured_array(cursor) - - -class AuxDataBaseFrontEnd(object): - - def __init__(self, database): - self.database = database - self.__service_names = None # to be initialised in __init_service_names - self.__fill_in_services() - - def __fill_in_services(self): - self.__init_service_names() - for service_name in self.__service_names: - service = Service(self.database[service_name]) - setattr(self, service_name, service) - - def __init_service_names(self): - self.__service_names = list() - for collection_name in self.database.collection_names(): - if 'system.indexes' not in collection_name: - self.__service_names.append(collection_name) - - -def connect(host=None, port=None, db_name=None): - if host is None: - host = settings.host - - if port is None: - port = settings.port - - if db_name is None: - db_name = settings.db_name - - client = MongoClient(host, port) - return AuxDataBaseFrontEnd(getattr(client, db_name)) diff --git a/fact/slowdata/correlation.py b/fact/slowdata/correlation.py deleted file mode 100644 index f88ea96..0000000 --- a/fact/slowdata/correlation.py +++ /dev/null @@ -1,151 +0,0 @@ -''' library of functions for correlating services -''' -import numpy as np -from scipy import interpolate - - -def interpolator(support, times_to_be_evaluated): - ''' - support : is a structured array with, 'Time' - and a lot of other fields, to be interpolated. - - times_to_be_evaluated : is a 1D array-like holding the times, - where the interpolation should be done - ''' - output_array = np.zeros(len(times_to_be_evaluated), dtype=support.dtype) - output_array['Time'] = times_to_be_evaluated - - field_names = [name for name in support.dtype.names if 'Time' not in name] - for name in field_names: - output_array[name] = interpolate.interp1d( - support['Time'], - support[name], axis=0 - )(times_to_be_evaluated) - return output_array - - -class CorrelationByInterpolation(object): - - def __init__(self, service_1, service_2, delta_in_seconds): - self.service_1 = service_1 - self.service_2 = service_2 - self.delta_in_seconds = delta_in_seconds - self.services = [service_1, service_2] - - def correlate(self): - ''' correlate by interpolation - ''' - averaged_times = self.__create_average_times() - common_averaged_times = self.__common_times(averaged_times) - new_service_2 = interpolator(self.service_2, common_averaged_times) - new_service_1 = interpolator(self.service_1, common_averaged_times) - return new_service_1, new_service_2 - - def __common_times(self, averaged_times): - ''' crop times for interpolation - - times : 1D array, containting times in FACT format - services: list of services (having a 'Time' field) - - times is cropped such, that times does not contain any time - ealier than the earliest time in any service, and no time later - than the latest time in any service. - ''' - maximal_earliest = max([s['Time'][0] for s in self.services]) - minimal_latest = min([s['Time'][-1] for s in self.services]) - start, stop = np.searchsorted( - averaged_times, - [maximal_earliest, minimal_latest]) - return averaged_times[start:stop] - - def __create_average_times(self): - ''' create 'near' times from the 'Time' fields self.services - - near times, are time stampfs which are nearer to each other than delta - each group of times, which is near enough creates one averaged timestamp - which is simply the average of the intire group of times - ''' - - list_of_averages = self.__make_average_bad_name( - self.service_1['Time'], - self.service_2['Time']) - list_of_averages += self.__make_average_bad_name( - self.service_2['Time'], - self.service_1['Time']) - return np.array(list_of_averages) - - def __make_average_bad_name(self, time1, time2): - ''' helper for the fucntion above. this one is not easy to explain - maybe this means it needs refactoring - ''' - # convert times to seconds - time1 *= 24 * 3600 - time2 *= 24 * 3600 - list_of_averages = [] - left_indices = np.searchsorted( - time1, - time2 - self.delta_in_seconds) - right_indices = np.searchsorted( - time1, - time2 + self.delta_in_seconds) - - for index_in_2, tup in enumerate(zip(left_indices, right_indices)): - left, right = tup - this_time2 = time2[index_in_2] - list_of_averages.append( - (time1[left:right].sum() + this_time2) / (1 + len(time1[left:right])) - ) - return list_of_averages - - -class CorrelationByIdentification(object): - - def __init__(self, service_1, service_2, delta_in_seconds): - if service_1.shape[0] > service_2.shape[0]: - self.longer = service_1 - self.shorter = service_2 - self.__long_1_short_2 = True - else: - self.longer = service_2 - self.shorter = service_1 - self.__long_1_short_2 = False - - self.delta_in_seconds = delta_in_seconds - - # a version of self.longer, which contains as many rows - # as self.shorter, the timestamps in new_longer are as close to those - # in self.shorter as possible - self.__new_longer = None - - def correlate(self): - ''' make parts of service_1 and service_2 of equal length and timestamps - ''' - self.__new_longer = self.__identify() - self.__cut_for_delta() - return - - def __identify(self): - ''' return that part of longer, which corresponds to best to shorter - ''' - t_short = self.shorter['Time'] - t_long = self.longer['Time'] - indices_in_longer = np.searchsorted(a=t_long, v=t_short) - a_bit_smaller = t_long.take(indices_in_longer - 1, mode='clip') - a_bit_larger = t_long.take(indices_in_longer, mode='clip') - indices_nearest = np.where( - (t_short - a_bit_smaller) < (a_bit_larger - t_short), - indices_in_longer - 1, indices_in_longer - ) - return self.longer.take(indices_nearest, mode='clip') - - def __cut_for_delta(self): - ''' crop services of equal length, according to timestamps - ''' - time1 = self.shorter['Time'] * 24 * 3600 - time2 = self.__new_longer['Time'] * 24 * 3600 - good = np.abs(time1 - time2) < self.delta_in_seconds - - if self.__long_1_short_2: - return self.__new_longer[good], self.shorter[good] - else: - return self.shorter[good], self.__new_longer[good] diff --git a/fact/slowdata/settings.py b/fact/slowdata/settings.py deleted file mode 100644 index e523a74..0000000 --- a/fact/slowdata/settings.py +++ /dev/null @@ -1,3 +0,0 @@ -db_name = 'aux' -host = 'localhost' -port = 37017 diff --git a/fact/slowdata/tools.py b/fact/slowdata/tools.py deleted file mode 100644 index 35bd4bf..0000000 --- a/fact/slowdata/tools.py +++ /dev/null @@ -1,70 +0,0 @@ -""" library of (hopefully) useful functions for the slow data DB interface -""" -import numpy as np -import pymongo - - -def cursor_to_rec_array(cursor): - """ convert a pymongo.cursor.Cursor to an numpz recarray - """ - array = cursor_to_structured_array(cursor) - return array.view(np.recarray) - - -def cursor_to_structured_array(cursor): - """ convert a pymongo.cursor.Cursor to an numpy structured array - """ - number_of_docs = cursor.count() - # if number_of_docs > 1000: - # logging.warning("loading {} documents form database" - # " .. might take a while".format(number_of_docs)) - - structured_array_dtype = make_numpy_dtype_from_cursor(cursor) - array = np.zeros(number_of_docs, structured_array_dtype) - - for counter, document in enumerate(cursor): - for field_name in structured_array_dtype.names: - try: - array[field_name][counter] = document[field_name] - except KeyError: - array[field_name][counter] = np.nan - - return array - - -def make_numpy_dtype_from_cursor(cursor): - """ infer datatype of structured array from document(s) from a cursor - """ - collection_of_this_cursor = cursor.collection - # get the newest entry from this collection - # this one defines the dtype of the numpy array, - # and we want to stick to the newest format. - example_document = collection_of_this_cursor.find_one( - {}, - sort=[("Time", pymongo.DESCENDING)] - ) - example_document = cursor[0] - - if example_document is None: - # collection is empty - raise LookupError('associated collection of cursor is empty.') - - return make_numpy_dtype_from_document(example_document) - - -def make_numpy_dtype_from_document(doc): - """ infer datatype of structured array from document - """ - list_of_names_n_types = [] - for field_name in doc: - if '_id' in field_name: - continue - element = doc[field_name] - element_array = np.array(element) - list_of_names_n_types.append( - (str(field_name), - element_array.dtype.str, - element_array.shape) - ) - - return np.dtype(list_of_names_n_types) diff --git a/setup.py b/setup.py index 3041288..bff3e7e 100644 --- a/setup.py +++ b/setup.py @@ -39,7 +39,6 @@ 'numpy', 'pandas', 'peewee>=3', - 'pymongo>=2.7', 'pymysql', 'python-dateutil', 'scipy',