-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #20 from allegro/update-dependencies
Remove pandas and update dependencies
- Loading branch information
Showing
5 changed files
with
74 additions
and
156 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,35 +1,44 @@ | ||
import csv | ||
import logging | ||
from pathlib import Path | ||
from typing import List, Optional, Union | ||
from typing import Any, Dict, List, Optional, Union, OrderedDict | ||
|
||
import fsspec | ||
import pandas as pd | ||
|
||
from allms.constants.input_data import IODataConstants | ||
from allms.domain.input_data import InputData | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
def load_data( | ||
def load_csv( | ||
path: str, | ||
limit: Optional[int] = None | ||
) -> List[InputData]: | ||
) -> List[OrderedDict[Any, Any]]: | ||
logger.info(f"Loading test data from {path}") | ||
input_df = pd.read_csv(path) | ||
input_df = input_df.head(limit) if limit else input_df | ||
return load_input_data(input_df) | ||
with open(path, mode='r') as csv_file: | ||
csv_reader = csv.DictReader(csv_file) | ||
data = list(csv_reader) | ||
return data[:limit] if limit else data | ||
|
||
|
||
def load_input_data(input_df: pd.DataFrame) -> List[InputData]: | ||
def load_csv_to_input_data(path: str, limit: Optional[int] = None) -> List[InputData]: | ||
csv_data = load_csv(path, limit=limit) | ||
return list( | ||
map( | ||
lambda row: InputData(input_mappings=row[1].drop(IODataConstants.ID).to_dict(), id=str(row[1].id)), | ||
input_df.iterrows() | ||
lambda row: InputData(input_mappings=drop_dict_key(row, IODataConstants.ID), | ||
id=str(row[IODataConstants.ID])), | ||
csv_data | ||
) | ||
) | ||
|
||
|
||
def drop_dict_key(dictionary: Dict[Any, Any], key: Any) -> Dict[Any, Any]: | ||
dict_copy = dictionary.copy() | ||
dict_copy.pop(key) | ||
return dict_copy | ||
|
||
|
||
def load_credentials(path: Union[str, Path]) -> str: | ||
with fsspec.open(path, "r") as credentials_file: | ||
return credentials_file.readline() |
Oops, something went wrong.