Skip to content

Commit

Permalink
Merge branch 'master' of github.com:unnati-xyz/fifthel-2016-workshop
Browse files Browse the repository at this point in the history
  • Loading branch information
raghothams committed Jul 30, 2016
2 parents ab9ac24 + 0d661ec commit 430d95e
Show file tree
Hide file tree
Showing 2 changed files with 234 additions and 0 deletions.
109 changes: 109 additions & 0 deletions poget/analytics/ml/linear_regression.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import os
import shutil
from pyspark import SparkConf, SparkContext,SQLContext
from pyspark.mllib.regression import LabeledPoint, LinearRegressionWithSGD

from poget import LOGGER


class LinearRegression:

def __init__(self):
# configuring spark
self.spark_conf = SparkConf()
self.sc = SparkContext(conf=self.spark_conf)
self.sql_context = SQLContext(self.sc)

def test_train(self, df, target, train_split, test_split):
try:
LOGGER.info("Generation linear regression")

spark_df = self.sql_context.createDataFrame(df)
feature_columns = spark_df.columns
feature_columns.remove(target)

train, test = spark_df.randomSplit([train_split, test_split], seed=1000000)

X_train = train.select(*feature_columns).map(lambda x: list(x))
y_train = train.select(target).map(lambda x: x[0])

zipped = y_train.zip(X_train)
train_data = zipped.map(lambda x: LabeledPoint(x[0], x[1]))

linear_model = LinearRegressionWithSGD.train(train_data)

X_test = test.select(*feature_columns).map(lambda x: list(x))
y_test = test.select(target).map(lambda x: x[0])

prediction = X_test.map(lambda lp: (float(linear_model.predict(lp))))
prediction_and_label = prediction.zip(y_test)

MSE = prediction_and_label.map(lambda (v, p): (v - p) ** 2).reduce(lambda x, y: x + y) / prediction_and_label.count()

LOGGER.info(prediction_and_label.map(lambda (labelAndPred[0], labelAndPred[1]): labelAndPred[0] == labelAndPred[1]).mean())
except Exception as e:
raise e

def train(self, df, target):
try:
LOGGER.info("Generation linear regression")

spark_df = self.sql_context.createDataFrame(df)
feature_columns = spark_df.columns
feature_columns.remove(target)


X_train = spark_df.select(*feature_columns).map(lambda x: list(x))
y_train = spark_df.select(target).map(lambda x: x[0])

zipped = y_train.zip(X_train)
train_data = zipped.map(lambda x: LabeledPoint(x[0], x[1]))

linear_model = LinearRegressionWithSGD.train(train_data)

self.model = linear_model

except Exception as e:
raise e


def persist(self, location):
try:
LOGGER.info("Writing the model to location %s"%location)
data = 'data'
meta_data = 'metadata'

data_location = os.path.join(location, data)
if os.path.exists(data_location):
LOGGER.info("Removing directory %s"%data_location)
shutil.rmtree(data_location)

data_location = os.path.join(location, meta_data)
if os.path.exists(data_location):
LOGGER.info("Removing directory %s"%data_location)
shutil.rmtree(data_location)

self.model.save(self.sc, location)
except Exception as e:
raise e


def predict(self, df):
try:
LOGGER.info("Predicting using linear regression")
spark_df = self.sql_context.createDataFrame(df)
feature_columns = spark_df.columns
inp_data = spark_df.select(*feature_columns).map(lambda x: list(x))
inp_data = spark_df.map(lambda x: list(x))
result = self.model.predict(inp_data.map(lambda x: x)).collect()
LOGGER.info("Predicted output is %s"%str(result))
return result

except Exception as e:
raise e

def load(self, location):
try:
self.model = LinearRegressionWithSGD.load(self.sc, location)
except Exception as e:
raise e
125 changes: 125 additions & 0 deletions poget/analytics/ml/terminal_transactions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
import traceback
import json
import pandas as pd
from poget.utils.db import DBConn
import poget.utils.ml as mlUtils
from poget import LOGGER

from poget.analytics.ml.linear_regression import LinearRegression


# ## Hypothesis 2
#
# Given a time slot and terminal code, predict number of transactions
class TerminalTraffic:

def __init__(self):
self.name = ''
self.main_directory = 'poget'
self.models_directory = 'models'

def get_data(self):
try:
conn = DBConn().get_connection()
query = '''SELECT * from trips'''
LOGGER.info("Reading data from db : %s" % (query))
df = pd.read_sql(query, con=conn)
return df

except Exception as e:
LOGGER.error(traceback.format_exc())
raise e

# ### Feature engineering
#
# * Find & replace null values
# * Hour window
# * Target generation
# * Removal of non categorical fields

def generate_feature_from_data(self, inp_data):
try:
LOGGER.info("Generating features from data")
LOGGER.info("Input data has the shape %s" % str(inp_data.shape))

inp_data['start_hour'] = inp_data["Start Date"].apply(mlUtils.get_hour)
inp_data['start_day'] = inp_data["Start Date"].apply(mlUtils.get_day)

inp_data['end_hour'] = inp_data["End Date"].apply(mlUtils.get_hour)
inp_data['end_day'] = inp_data["End Date"].apply(mlUtils.get_day)

LOGGER.info(inp_data.head())

#now lets find the count traffic for an hour given a day of the week and terminal
start_df = inp_data.groupby(by=["start_hour", "start_day", "Start Terminal"]).count().copy()
start_df = start_df.reset_index()

LOGGER.info(start_df.head())

LOGGER.info("creating start df")
# getting only the required columns
start_df = start_df.ix[:, ["start_hour", "start_day", "Start Terminal", "Trip ID"]]
start_df.columns = ["hour", "day", "terminal_code", "trip_id"]
start_df.head()


LOGGER.info("creating end df")
end_df = inp_data.groupby(by=["end_hour", "end_day", "End Terminal"]).count().copy()
end_df = end_df.reset_index()
end_df = end_df.ix[:, ["end_hour", "end_day", "End Terminal", "Trip ID"]]
end_df.columns = ["hour", "day", "terminal_code", "trip_id"]
LOGGER.info(end_df.head())

LOGGER.info("merging start and end df")
# merge start and end data frames to generate traffic counts for a terminal
merged_df = start_df.merge(end_df, how="inner", on=["hour", "day", "terminal_code"])


merged_df["target"] = merged_df["trip_id_x"] + merged_df["trip_id_y"]
merged_df = merged_df.ix[:, ["hour", "day", "terminal_code", "target"]]

return merged_df

except Exception as e:
LOGGER.error(traceback.format_exc())
raise e

def get_features_for_prediction(self, data):

try:
json_data = json.dumps(data)
df = pd.read_json(json_data)
LOGGER.info("feature set is")
LOGGER.info(df.head())

return df
except Exception:
LOGGER.error(traceback.format_exc())
raise

def train_test_model(self, df):
try:
# use this data to train the model and predict
model = LinearRegression()
LOGGER.info("training the model")
model.test_train(df=df, target='target', train_split=0.8, test_split=0.2)

except Exception as e:
LOGGER.error(traceback.format_exc())
raise e

def train_model(self, df):
try:
# use this data to train the model and predict
model = LinearRegression()
model.train(df=df, target='target')
return model
except Exception as e:
LOGGER.error(traceback.format_exc())


if __name__ == '__main__':
terminal_traffic = TerminalTraffic()
data = terminal_traffic.get_data()
df = terminal_traffic.generate_feature_from_data(inp_data=data)
terminal_traffic.train_test_model(df=df)

0 comments on commit 430d95e

Please sign in to comment.