From 0d661ec416ffd74ddf937824c7a9215bf6e1b8c4 Mon Sep 17 00:00:00 2001 From: nischal Date: Sat, 30 Jul 2016 22:11:16 +0530 Subject: [PATCH] Adding linear regression and hypothesis 2 skeleton --- poget/analytics/ml/linear_regression.py | 109 +++++++++++++++++ poget/analytics/ml/terminal_transactions.py | 125 ++++++++++++++++++++ 2 files changed, 234 insertions(+) create mode 100644 poget/analytics/ml/linear_regression.py create mode 100644 poget/analytics/ml/terminal_transactions.py diff --git a/poget/analytics/ml/linear_regression.py b/poget/analytics/ml/linear_regression.py new file mode 100644 index 0000000..aff8b76 --- /dev/null +++ b/poget/analytics/ml/linear_regression.py @@ -0,0 +1,109 @@ +import os +import shutil +from pyspark import SparkConf, SparkContext,SQLContext +from pyspark.mllib.regression import LabeledPoint, LinearRegressionWithSGD + +from poget import LOGGER + + +class LinearRegression: + + def __init__(self): + # configuring spark + self.spark_conf = SparkConf() + self.sc = SparkContext(conf=self.spark_conf) + self.sql_context = SQLContext(self.sc) + + def test_train(self, df, target, train_split, test_split): + try: + LOGGER.info("Generation linear regression") + + spark_df = self.sql_context.createDataFrame(df) + feature_columns = spark_df.columns + feature_columns.remove(target) + + train, test = spark_df.randomSplit([train_split, test_split], seed=1000000) + + X_train = train.select(*feature_columns).map(lambda x: list(x)) + y_train = train.select(target).map(lambda x: x[0]) + + zipped = y_train.zip(X_train) + train_data = zipped.map(lambda x: LabeledPoint(x[0], x[1])) + + linear_model = LinearRegressionWithSGD.train(train_data) + + X_test = test.select(*feature_columns).map(lambda x: list(x)) + y_test = test.select(target).map(lambda x: x[0]) + + prediction = X_test.map(lambda lp: (float(linear_model.predict(lp)))) + prediction_and_label = prediction.zip(y_test) + + MSE = prediction_and_label.map(lambda (v, p): (v - p) ** 2).reduce(lambda x, y: x + y) / prediction_and_label.count() + + LOGGER.info(prediction_and_label.map(lambda (labelAndPred[0], labelAndPred[1]): labelAndPred[0] == labelAndPred[1]).mean()) + except Exception as e: + raise e + + def train(self, df, target): + try: + LOGGER.info("Generation linear regression") + + spark_df = self.sql_context.createDataFrame(df) + feature_columns = spark_df.columns + feature_columns.remove(target) + + + X_train = spark_df.select(*feature_columns).map(lambda x: list(x)) + y_train = spark_df.select(target).map(lambda x: x[0]) + + zipped = y_train.zip(X_train) + train_data = zipped.map(lambda x: LabeledPoint(x[0], x[1])) + + linear_model = LinearRegressionWithSGD.train(train_data) + + self.model = linear_model + + except Exception as e: + raise e + + + def persist(self, location): + try: + LOGGER.info("Writing the model to location %s"%location) + data = 'data' + meta_data = 'metadata' + + data_location = os.path.join(location, data) + if os.path.exists(data_location): + LOGGER.info("Removing directory %s"%data_location) + shutil.rmtree(data_location) + + data_location = os.path.join(location, meta_data) + if os.path.exists(data_location): + LOGGER.info("Removing directory %s"%data_location) + shutil.rmtree(data_location) + + self.model.save(self.sc, location) + except Exception as e: + raise e + + + def predict(self, df): + try: + LOGGER.info("Predicting using linear regression") + spark_df = self.sql_context.createDataFrame(df) + feature_columns = spark_df.columns + inp_data = spark_df.select(*feature_columns).map(lambda x: list(x)) + inp_data = spark_df.map(lambda x: list(x)) + result = self.model.predict(inp_data.map(lambda x: x)).collect() + LOGGER.info("Predicted output is %s"%str(result)) + return result + + except Exception as e: + raise e + + def load(self, location): + try: + self.model = LinearRegressionWithSGD.load(self.sc, location) + except Exception as e: + raise e diff --git a/poget/analytics/ml/terminal_transactions.py b/poget/analytics/ml/terminal_transactions.py new file mode 100644 index 0000000..b25c7f5 --- /dev/null +++ b/poget/analytics/ml/terminal_transactions.py @@ -0,0 +1,125 @@ +import traceback +import json +import pandas as pd +from poget.utils.db import DBConn +import poget.utils.ml as mlUtils +from poget import LOGGER + +from poget.analytics.ml.linear_regression import LinearRegression + + +# ## Hypothesis 2 +# +# Given a time slot and terminal code, predict number of transactions +class TerminalTraffic: + + def __init__(self): + self.name = '' + self.main_directory = 'poget' + self.models_directory = 'models' + + def get_data(self): + try: + conn = DBConn().get_connection() + query = '''SELECT * from trips''' + LOGGER.info("Reading data from db : %s" % (query)) + df = pd.read_sql(query, con=conn) + return df + + except Exception as e: + LOGGER.error(traceback.format_exc()) + raise e + + # ### Feature engineering + # + # * Find & replace null values + # * Hour window + # * Target generation + # * Removal of non categorical fields + + def generate_feature_from_data(self, inp_data): + try: + LOGGER.info("Generating features from data") + LOGGER.info("Input data has the shape %s" % str(inp_data.shape)) + + inp_data['start_hour'] = inp_data["Start Date"].apply(mlUtils.get_hour) + inp_data['start_day'] = inp_data["Start Date"].apply(mlUtils.get_day) + + inp_data['end_hour'] = inp_data["End Date"].apply(mlUtils.get_hour) + inp_data['end_day'] = inp_data["End Date"].apply(mlUtils.get_day) + + LOGGER.info(inp_data.head()) + + #now lets find the count traffic for an hour given a day of the week and terminal + start_df = inp_data.groupby(by=["start_hour", "start_day", "Start Terminal"]).count().copy() + start_df = start_df.reset_index() + + LOGGER.info(start_df.head()) + + LOGGER.info("creating start df") + # getting only the required columns + start_df = start_df.ix[:, ["start_hour", "start_day", "Start Terminal", "Trip ID"]] + start_df.columns = ["hour", "day", "terminal_code", "trip_id"] + start_df.head() + + + LOGGER.info("creating end df") + end_df = inp_data.groupby(by=["end_hour", "end_day", "End Terminal"]).count().copy() + end_df = end_df.reset_index() + end_df = end_df.ix[:, ["end_hour", "end_day", "End Terminal", "Trip ID"]] + end_df.columns = ["hour", "day", "terminal_code", "trip_id"] + LOGGER.info(end_df.head()) + + LOGGER.info("merging start and end df") + # merge start and end data frames to generate traffic counts for a terminal + merged_df = start_df.merge(end_df, how="inner", on=["hour", "day", "terminal_code"]) + + + merged_df["target"] = merged_df["trip_id_x"] + merged_df["trip_id_y"] + merged_df = merged_df.ix[:, ["hour", "day", "terminal_code", "target"]] + + return merged_df + + except Exception as e: + LOGGER.error(traceback.format_exc()) + raise e + + def get_features_for_prediction(self, data): + + try: + json_data = json.dumps(data) + df = pd.read_json(json_data) + LOGGER.info("feature set is") + LOGGER.info(df.head()) + + return df + except Exception: + LOGGER.error(traceback.format_exc()) + raise + + def train_test_model(self, df): + try: + # use this data to train the model and predict + model = LinearRegression() + LOGGER.info("training the model") + model.test_train(df=df, target='target', train_split=0.8, test_split=0.2) + + except Exception as e: + LOGGER.error(traceback.format_exc()) + raise e + + def train_model(self, df): + try: + # use this data to train the model and predict + model = LinearRegression() + model.train(df=df, target='target') + return model + except Exception as e: + LOGGER.error(traceback.format_exc()) + + +if __name__ == '__main__': + terminal_traffic = TerminalTraffic() + data = terminal_traffic.get_data() + df = terminal_traffic.generate_feature_from_data(inp_data=data) + terminal_traffic.train_test_model(df=df)