Skip to content

Commit

Permalink
Updated fillna function in base data_processor. Some changes in aksha…
Browse files Browse the repository at this point in the history
…re, tushare and baostock (#268)

* updated fillna function in base data_processor. Some changes in akshare, tushare and baostock

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* beautlfied by flake8

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* beautlfied by flake8

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
oliverwang15 and pre-commit-ci[bot] authored Feb 6, 2023
1 parent f4f89d1 commit c1037cd
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 348 deletions.
58 changes: 55 additions & 3 deletions meta/data_processors/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,51 @@ def clean_data(self):
]
]

def fillna(self):
df = self.dataframe

dfcode = pd.DataFrame(columns=["tic"])
dfdate = pd.DataFrame(columns=["time"])

dfcode.tic = df.tic.unique()
dfdate.time = df.time.unique()
dfdate.sort_values(by="time", ascending=False, ignore_index=True, inplace=True)

# the old pandas may not support pd.merge(how="cross")
try:
df1 = pd.merge(dfcode, dfdate, how="cross")
except:
print("Please wait for a few seconds...")
df1 = pd.DataFrame(columns=["tic", "time"])
for i in range(dfcode.shape[0]):
for j in range(dfdate.shape[0]):
df1 = df1.append(
pd.DataFrame(
data={
"tic": dfcode.iat[i, 0],
"time": dfdate.iat[j, 0],
},
index=[(i + 1) * (j + 1) - 1],
)
)

df = pd.merge(df1, df, how="left", on=["tic", "time"])

# back fill missing data then front fill
df_new = pd.DataFrame(columns=df.columns)
for i in df.tic.unique():
df_tmp = df[df.tic == i].fillna(method="bfill").fillna(method="ffill")
df_new = pd.concat([df_new, df_tmp], ignore_index=True)

df_new = df_new.fillna(0)

# reshape dataframe
df_new = df_new.sort_values(by=["time", "tic"]).reset_index(drop=True)

print("Shape of DataFrame: ", df_new.shape)

self.dataframe = df_new

def get_trading_days(self, start: str, end: str) -> List[str]:
if self.data_source in [
"binance",
Expand All @@ -108,8 +153,12 @@ def get_trading_days(self, start: str, end: str) -> List[str]:
return None

# select_stockstats_talib: 0 (stockstats, default), or 1 (use talib). Users can choose the method.
# drop_na_timestep: 0 (not dropping timesteps that contain nan), or 1 (dropping timesteps that contain nan, default). Users can choose the method.
def add_technical_indicator(
self, tech_indicator_list: List[str], select_stockstats_talib: int = 0
self,
tech_indicator_list: List[str],
select_stockstats_talib: int = 0,
drop_na_timesteps: int = 1,
):
"""
calculate technical indicators
Expand Down Expand Up @@ -189,8 +238,11 @@ def add_technical_indicator(
self.dataframe = final_df

self.dataframe.sort_values(by=["time", "tic"], inplace=True)
time_to_drop = self.dataframe[self.dataframe.isna().any(axis=1)].time.unique()
self.dataframe = self.dataframe[~self.dataframe.time.isin(time_to_drop)]
if drop_na_timesteps:
time_to_drop = self.dataframe[
self.dataframe.isna().any(axis=1)
].time.unique()
self.dataframe = self.dataframe[~self.dataframe.time.isin(time_to_drop)]
print("Succesfully add technical indicators")

def add_turbulence(self):
Expand Down
184 changes: 19 additions & 165 deletions meta/data_processors/akshare.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def download_data(
):
"""
`pd.DataFrame`
7 columns: A tick symbol, date, open, high, low, close and volume
7 columns: A tick symbol, time, open, high, low, close and volume
for the specified stock ticker
"""
assert self.time_interval in [
Expand All @@ -79,7 +79,7 @@ def download_data(
time.sleep(0.25)

self.dataframe.columns = [
"date",
"time",
"open",
"close",
"high",
Expand All @@ -93,23 +93,23 @@ def download_data(
"tic",
]

self.dataframe.sort_values(by=["date", "tic"], inplace=True)
self.dataframe.sort_values(by=["time", "tic"], inplace=True)
self.dataframe.reset_index(drop=True, inplace=True)

self.dataframe = self.dataframe[
["tic", "date", "open", "high", "low", "close", "volume"]
["tic", "time", "open", "high", "low", "close", "volume"]
]
# self.dataframe.loc[:, 'tic'] = pd.DataFrame((self.dataframe['tic'].tolist()))
self.dataframe["date"] = pd.to_datetime(
self.dataframe["date"], format="%Y-%m-%d"
self.dataframe["time"] = pd.to_datetime(
self.dataframe["time"], format="%Y-%m-%d"
)
self.dataframe["day"] = self.dataframe["date"].dt.dayofweek
self.dataframe["date"] = self.dataframe.date.apply(
self.dataframe["day"] = self.dataframe["time"].dt.dayofweek
self.dataframe["time"] = self.dataframe.time.apply(
lambda x: x.strftime("%Y-%m-%d")
)

self.dataframe.dropna(inplace=True)
self.dataframe.sort_values(by=["date", "tic"], inplace=True)
self.dataframe.sort_values(by=["time", "tic"], inplace=True)
self.dataframe.reset_index(drop=True, inplace=True)

self.save_data(save_path)
Expand All @@ -118,155 +118,9 @@ def download_data(
f"Download complete! Dataset saved to {save_path}. \nShape of DataFrame: {self.dataframe.shape}"
)

def clean_data(self):
dfc = copy.deepcopy(self.dataframe)

dfcode = pd.DataFrame(columns=["tic"])
dfdate = pd.DataFrame(columns=["date"])

dfcode.tic = dfc.tic.unique()

if "time" in dfc.columns.values.tolist():
dfc = dfc.rename(columns={"time": "date"})

dfdate.date = dfc.date.unique()
dfdate.sort_values(by="date", ascending=False, ignore_index=True, inplace=True)

# the old pandas may not support pd.merge(how="cross")
try:
df1 = pd.merge(dfcode, dfdate, how="cross")
except:
print("Please wait for a few seconds...")
df1 = pd.DataFrame(columns=["tic", "date"])
for i in range(dfcode.shape[0]):
for j in range(dfdate.shape[0]):
df1 = df1.append(
pd.DataFrame(
data={
"tic": dfcode.iat[i, 0],
"date": dfdate.iat[j, 0],
},
index=[(i + 1) * (j + 1) - 1],
)
)

df2 = pd.merge(df1, dfc, how="left", on=["tic", "date"])

# back fill missing data then front fill
df3 = pd.DataFrame(columns=df2.columns)
for i in self.ticker_list:
df4 = df2[df2.tic == i].fillna(method="bfill").fillna(method="ffill")
df3 = pd.concat([df3, df4], ignore_index=True)

df3 = df3.fillna(0)

# reshape dataframe
df3 = df3.sort_values(by=["date", "tic"]).reset_index(drop=True)

if "date" in self.dataframe.columns.values.tolist():
self.dataframe.rename(columns={"date": "time"}, inplace=True)
if "datetime" in self.dataframe.columns.values.tolist():
self.dataframe.rename(columns={"datetime": "time"}, inplace=True)

print("Shape of DataFrame: ", df3.shape)

self.dataframe = df3

def add_technical_indicator(
self,
tech_indicator_list: List[str],
select_stockstats_talib: int = 0,
drop_na_timestpe: int = 0,
):
"""
calculate technical indicators
use stockstats/talib package to add technical inidactors
:param data: (df) pandas dataframe
:return: (df) pandas dataframe
"""
if "date" in self.dataframe.columns.values.tolist():
self.dataframe.rename(columns={"date": "time"}, inplace=True)

if self.data_source == "ccxt":
self.dataframe.rename(columns={"index": "time"}, inplace=True)

self.dataframe.reset_index(drop=False, inplace=True)
if "level_1" in self.dataframe.columns:
self.dataframe.drop(columns=["level_1"], inplace=True)
if "level_0" in self.dataframe.columns and "tic" not in self.dataframe.columns:
self.dataframe.rename(columns={"level_0": "tic"}, inplace=True)
assert select_stockstats_talib in {0, 1}
print("tech_indicator_list: ", tech_indicator_list)
if select_stockstats_talib == 0: # use stockstats
stock = stockstats.StockDataFrame.retype(self.dataframe)
unique_ticker = stock.tic.unique()
for indicator in tech_indicator_list:
print("indicator: ", indicator)
indicator_df = pd.DataFrame()
for i in range(len(unique_ticker)):
try:
temp_indicator = stock[stock.tic == unique_ticker[i]][indicator]
temp_indicator = pd.DataFrame(temp_indicator)
temp_indicator["tic"] = unique_ticker[i]
temp_indicator["time"] = self.dataframe[
self.dataframe.tic == unique_ticker[i]
]["time"].to_list()
indicator_df = pd.concat(
[indicator_df, temp_indicator],
axis=0,
join="outer",
ignore_index=True,
)
except Exception as e:
print(e)
if not indicator_df.empty:
self.dataframe = self.dataframe.merge(
indicator_df[["tic", "time", indicator]],
on=["tic", "time"],
how="left",
)
else: # use talib
final_df = pd.DataFrame()
for i in self.dataframe.tic.unique():
tic_df = self.dataframe[self.dataframe.tic == i]
(
tic_df.loc["macd"],
tic_df.loc["macd_signal"],
tic_df.loc["macd_hist"],
) = talib.MACD(
tic_df["close"],
fastperiod=12,
slowperiod=26,
signalperiod=9,
)
tic_df.loc["rsi"] = talib.RSI(tic_df["close"], timeperiod=14)
tic_df.loc["cci"] = talib.CCI(
tic_df["high"],
tic_df["low"],
tic_df["close"],
timeperiod=14,
)
tic_df.loc["dx"] = talib.DX(
tic_df["high"],
tic_df["low"],
tic_df["close"],
timeperiod=14,
)
final_df = pd.concat([final_df, tic_df], axis=0, join="outer")
self.dataframe = final_df

self.dataframe.sort_values(by=["time", "tic"], inplace=True)
if drop_na_timestpe:
time_to_drop = self.dataframe[
self.dataframe.isna().any(axis=1)
].time.unique()
self.dataframe = self.dataframe[~self.dataframe.time.isin(time_to_drop)]
self.dataframe.rename(columns={"time": "date"}, inplace=True)
print("Succesfully add technical indicators")

def data_split(self, df, start, end, target_date_col="date"):
def data_split(self, df, start, end, target_date_col="time"):
"""
split the dataset into training or testing using date
split the dataset into training or testing using time
:param data: (df) pandas dataframe, start, end
:return: (df) pandas dataframe
"""
Expand All @@ -285,11 +139,11 @@ def transfer_standard_ticker_to_nonstandard(self, ticker: str) -> str:
# assert alpha in ["XSHG", "XSHE"], "Wrong alpha"
return n

def transfer_date(self, date: str) -> str:
if "-" in date:
date = "".join(date.split("-"))
elif "." in date:
date = "".join(date.split("."))
elif "/" in date:
date = "".join(date.split("/"))
return date
def transfer_date(self, time: str) -> str:
if "-" in time:
time = "".join(time.split("-"))
elif "." in time:
time = "".join(time.split("."))
elif "/" in time:
time = "".join(time.split("/"))
return time
4 changes: 4 additions & 0 deletions meta/data_processors/baostock.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ def download_data(
)
bs.logout()

self.dataframe.open = self.dataframe.open.astype(float)
self.dataframe.high = self.dataframe.high.astype(float)
self.dataframe.low = self.dataframe.low.astype(float)
self.dataframe.close = self.dataframe.close.astype(float)
self.save_data(save_path)

print(
Expand Down
Loading

0 comments on commit c1037cd

Please sign in to comment.