-
Notifications
You must be signed in to change notification settings - Fork 8
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Adding scripts to populate data into the database
- Loading branch information
Showing
5 changed files
with
544 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
class Constants: | ||
# database namespace | ||
GRN_DATABASE_NAMESPACE = "gene_regulatory_network" | ||
PPI_DATABASE_NAMESPACE = "protein_protein_interactions" | ||
|
||
# network types | ||
GRN_NETWORK_MODE = "grn" | ||
PPI_NETWORK_MODE = "ppi" | ||
|
||
# data file paths | ||
DATA_DIRECTORY = "script-results" | ||
GENE_DATA_FILEPATH = DATA_DIRECTORY + "/gene_data.tsv" | ||
PROTEIN_DATA_FILEPATH = DATA_DIRECTORY + "/protein_data.tsv" | ||
GENE_REGULATORY_NETWORK_DATA_FILEPATH = DATA_DIRECTORY + "/gene_regulatory_network_data.tsv" | ||
PROTEIN_PROTEIN_INTERACTIONS_DATA_FILEPATH = DATA_DIRECTORY + "/protein_protein_interactions_data.tsv" | ||
SOURCE_DATA_FILEPATH = DATA_DIRECTORY + "/source_data.tsv" | ||
|
||
# missing and update file paths | ||
MISSING_DATA_DIRECTORY = DATA_DIRECTORY + "/missing_data" | ||
UPDATE_DATA_DIRECTORY = DATA_DIRECTORY + "/update_data" | ||
MISSING_GRN_GENE_DATA_FILEPATH = MISSING_DATA_DIRECTORY + "/missing_grn_gene_data.tsv" | ||
UPDATE_GRN_GENE_DATA_FILEPATH = UPDATE_DATA_DIRECTORY + "/update_grn_gene_data.tsv" | ||
MISSING_PPI_GENE_DATA_FILEPATH = MISSING_DATA_DIRECTORY + "/missing_ppi_gene_data.tsv" | ||
UPDATE_PPI_GENE_DATA_FILEPATH = UPDATE_DATA_DIRECTORY + "/update_ppi_gene_data.tsv" | ||
MISSING_PROTEIN_DATA_FILEPATH = MISSING_DATA_DIRECTORY + "/missing_protein_data.tsv" | ||
UPDATE_PROTEIN_DATA_FILEPATH = UPDATE_DATA_DIRECTORY + "/update_protein_data.tsv" | ||
UPDATE_PROTEIN_NAME_DATA_FILEPATH = UPDATE_DATA_DIRECTORY + "/update_protein_name_data.tsv" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,152 @@ | ||
import psycopg2 | ||
import csv | ||
import pandas as pd | ||
from constants import Constants | ||
|
||
class Filter: | ||
def __init__(self, db_url, save_service): | ||
self.db_url = db_url | ||
self.save_service = save_service | ||
|
||
def get_all_db_data(self, database_namespace, table_name, columns): | ||
""" | ||
Fetch all data from the specified table and return it as a list of dictionaries. | ||
""" | ||
conn = psycopg2.connect(self.db_url) | ||
cursor = conn.cursor() | ||
|
||
query = f"SELECT {', '.join(columns)} FROM {database_namespace}.{table_name};" | ||
cursor.execute(query) | ||
|
||
rows = cursor.fetchall() | ||
column_names = [desc[0] for desc in cursor.description] | ||
|
||
result = [dict(zip(column_names, row)) for row in rows] | ||
|
||
cursor.close() | ||
conn.close() | ||
|
||
return result | ||
|
||
def filter_data(self, data_filepath, db_data, key_columns, update_columns): | ||
""" | ||
Filter the data to return: | ||
- Records that need to be inserted. | ||
- Records that need to be updated. | ||
""" | ||
with open(data_filepath, 'r') as f: | ||
reader = csv.DictReader(f, delimiter='\t') | ||
data = list(reader) | ||
|
||
db_keys = {tuple(row[col] for col in key_columns): row for row in db_data} | ||
|
||
insert_data = [] | ||
update_data = [] | ||
update_data_names = [] | ||
|
||
for row in data: | ||
key_tuple = tuple(row[col] for col in key_columns) | ||
if key_tuple in db_keys: | ||
db_record = db_keys[key_tuple] | ||
changes_needed = False | ||
|
||
for col in update_columns: | ||
if str(row[col]).lower() != str(db_record[col]).lower(): | ||
# Special case for protein daat that ned to check if standard name is changed | ||
if col == "standard_name" and data_filepath == Constants.PROTEIN_DATA_FILEPATH: | ||
update_data_names.append({ | ||
"old_standard_name": db_record[col], | ||
"new_standard_name": row[col], | ||
}) | ||
|
||
changes_needed = True | ||
break | ||
|
||
if changes_needed: | ||
update_data.append({ | ||
**{col: row[col] for col in key_columns + update_columns}, | ||
}) | ||
else: | ||
insert_data.append(row) | ||
|
||
insert_data_df = pd.DataFrame(insert_data) | ||
update_data_df = pd.DataFrame(update_data) | ||
|
||
self.save_service.save(insert_data_df, Constants.MISSING_DATA_DIRECTORY, self.missing_filepath) | ||
self.save_service.save(update_data_df, Constants.UPDATE_DATA_DIRECTORY, self.update_filepath) | ||
|
||
if data_filepath == Constants.PROTEIN_DATA_FILEPATH: | ||
update_data_names_df = pd.DataFrame(update_data_names) | ||
self.save_service.save(update_data_names_df, Constants.UPDATE_DATA_DIRECTORY, Constants.UPDATE_PROTEIN_NAME_DATA_FILEPATH) | ||
|
||
|
||
class ProteinFilter(Filter): | ||
def __init__(self, db_url, save_service): | ||
super().__init__(db_url, save_service) | ||
self.missing_filepath = Constants.MISSING_PROTEIN_DATA_FILEPATH | ||
self.update_filepath = Constants.UPDATE_PROTEIN_DATA_FILEPATH | ||
|
||
def get_all_db_data(self): | ||
""" | ||
Fetch all protein data from the database. | ||
""" | ||
columns = ["standard_name", "gene_systematic_name", "length", "molecular_weight", "pi"] | ||
return super().get_all_db_data(Constants.PPI_DATABASE_NAMESPACE, "protein", columns) | ||
|
||
def filter_data(self): | ||
""" | ||
Filter protein data that is missing or needs to be updated in the database. | ||
""" | ||
db_data = self.get_all_db_data() | ||
|
||
key_columns = ["gene_systematic_name"] | ||
update_columns = ["standard_name", "length", "molecular_weight", "pi"] | ||
|
||
return super().filter_data(Constants.PROTEIN_DATA_FILEPATH, db_data, key_columns, update_columns) | ||
|
||
class GeneFilter(Filter): | ||
def __init__(self, db_url, save_service, network_mode): | ||
super().__init__(db_url, save_service) | ||
self.network_mode = network_mode | ||
if network_mode == Constants.GRN_NETWORK_MODE: | ||
self.missing_filepath = Constants.MISSING_GRN_GENE_DATA_FILEPATH | ||
self.update_filepath = Constants.UPDATE_GRN_GENE_DATA_FILEPATH | ||
self.database_namespace = Constants.GRN_DATABASE_NAMESPACE | ||
elif network_mode == Constants.PPI_NETWORK_MODE: | ||
self.missing_filepath = Constants.MISSING_PPI_GENE_DATA_FILEPATH | ||
self.update_filepath = Constants.UPDATE_PPI_GENE_DATA_FILEPATH | ||
self.database_namespace = Constants.PPI_DATABASE_NAMESPACE | ||
else: | ||
raise ValueError("Unknown network type specified.") | ||
|
||
def get_all_db_data(self): | ||
""" | ||
Fetch all gene data from the database. | ||
""" | ||
if self.network_mode == Constants.GRN_NETWORK_MODE: | ||
columns = ["gene_id", "display_gene_id", "regulator"] | ||
elif self.network_mode == Constants.PPI_NETWORK_MODE: | ||
|
||
columns = ["gene_id", "display_gene_id"] | ||
else: | ||
raise ValueError("Unknown network type specified.") | ||
|
||
return super().get_all_db_data(self.database_namespace, "gene", columns) | ||
|
||
def filter_data(self): | ||
""" | ||
Filter gene data that is missing or needs to be updated in the database. | ||
""" | ||
|
||
if self.network_mode == Constants.GRN_NETWORK_MODE: | ||
update_columns = ["display_gene_id", "regulator"] | ||
elif self.network_mode == Constants.PPI_NETWORK_MODE: | ||
update_columns = ["display_gene_id"] | ||
else: | ||
raise ValueError("Unknown network type specified.") | ||
|
||
key_columns = ["gene_id"] | ||
|
||
db_data = self.get_all_db_data() | ||
|
||
return super().filter_data(Constants.GENE_DATA_FILEPATH, db_data, key_columns, update_columns) |
125 changes: 125 additions & 0 deletions
125
database2/network-database/database_services/populator.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,125 @@ | ||
import psycopg2 | ||
from abc import ABC, abstractmethod | ||
from constants import Constants | ||
from io import StringIO | ||
|
||
class DataPopulator(ABC): | ||
|
||
def __init__(self, db_url): | ||
self.db_url = db_url | ||
self.filepath = None | ||
self.network_mode = None | ||
|
||
@abstractmethod | ||
def get_copy_statement(self): | ||
""" | ||
This method should return the COPY SQL statement for the specific type of data. | ||
""" | ||
pass | ||
|
||
def determine_database_namespace(self, network_mode): | ||
if network_mode == Constants.GRN_NETWORK_MODE: | ||
return Constants.GRN_DATABASE_NAMESPACE | ||
elif network_mode == Constants.PPI_NETWORK_MODE: | ||
return Constants.PPI_DATABASE_NAMESPACE | ||
else: | ||
raise ValueError(f"Unknown network type: {network_mode}") | ||
|
||
def process_file(self, conn, cursor, data_filepath, copy_statement): | ||
""" | ||
A helper function that processes the input file and performs the COPY command to load data into the database. | ||
If the network is PPI, it drops the last column from the input data. | ||
""" | ||
|
||
# Determine if we need to drop the last column (PPI network type) | ||
print("NETWORK TYPE: ", self.network_mode) | ||
print("DATA FILEPATH: ", data_filepath) | ||
if self.network_mode == Constants.PPI_NETWORK_MODE and data_filepath == Constants.MISSING_PPI_GENE_DATA_FILEPATH: | ||
print("Dropping the last column from the input data...") | ||
processed_rows = [] | ||
|
||
with open(data_filepath, 'r') as f: | ||
for line in f: | ||
columns = line.strip().split('\t') | ||
processed_row = columns[:-1] | ||
processed_rows.append('\t'.join(processed_row)) | ||
|
||
from io import StringIO | ||
temp_file = StringIO("\n".join(processed_rows)) | ||
|
||
# Execute the COPY command using the processed data (without the last column) | ||
cursor.copy_expert(sql=copy_statement, file=temp_file) | ||
conn.commit() | ||
|
||
else: | ||
with open(data_filepath, 'r') as f: | ||
cursor.copy_expert(sql=copy_statement, file=f) | ||
conn.commit() | ||
|
||
print(f"Data from {data_filepath} has been successfully populated.") | ||
|
||
def populate_data(self): | ||
conn = psycopg2.connect(self.db_url) | ||
cursor = conn.cursor() | ||
|
||
copy_statement = self.get_copy_statement() | ||
|
||
self.process_file(conn, cursor, self.filepath, copy_statement) | ||
|
||
cursor.close() | ||
conn.close() | ||
|
||
class GeneDataPopulator(DataPopulator): | ||
def __init__(self, db_url, network_mode): | ||
super().__init__(db_url) | ||
self.network_mode = network_mode | ||
if network_mode == Constants.GRN_NETWORK_MODE: | ||
self.database_namespace = Constants.GRN_DATABASE_NAMESPACE | ||
self.filepath = Constants.MISSING_GRN_GENE_DATA_FILEPATH | ||
elif network_mode == Constants.PPI_NETWORK_MODE: | ||
self.database_namespace = Constants.PPI_DATABASE_NAMESPACE | ||
self.filepath = Constants.MISSING_PPI_GENE_DATA_FILEPATH | ||
else: | ||
raise ValueError(f"Unknown network type: {network_mode}") | ||
|
||
def get_copy_statement(self): | ||
if self.network_mode == Constants.GRN_NETWORK_MODE: | ||
return f"COPY {self.database_namespace}.gene (gene_id, display_gene_id, species, taxon_id, regulator) FROM stdin WITH CSV DELIMITER E'\\t' HEADER;" | ||
elif self.network_mode == Constants.PPI_NETWORK_MODE: | ||
return f"COPY {self.database_namespace}.gene (gene_id, display_gene_id, species, taxon_id) FROM stdin WITH CSV DELIMITER E'\\t' HEADER;" | ||
else: | ||
raise ValueError(f"Unknown network type: {self.network_mode}") | ||
|
||
class ProteinDataPopulator(DataPopulator): | ||
def __init__(self, db_url): | ||
super().__init__(db_url) | ||
self.filepath = Constants.MISSING_PROTEIN_DATA_FILEPATH | ||
|
||
def get_copy_statement(self): | ||
return f"COPY {Constants.PPI_DATABASE_NAMESPACE}.protein (standard_name, gene_systematic_name, length, molecular_weight, PI, taxon_id) FROM stdin WITH CSV DELIMITER E'\\t' HEADER;" | ||
|
||
class GeneRegulatoryNetworkDataPopulator(DataPopulator): | ||
def __init__(self, db_url): | ||
super().__init__(db_url) | ||
self.filepath = Constants.GENE_REGULATORY_NETWORK_DATA_FILEPATH | ||
|
||
def get_copy_statement(self): | ||
return f"COPY {Constants.GRN_DATABASE_NAMESPACE}.network (regulator_gene_id, target_gene_id, taxon_id, time_stamp, source) FROM stdin WITH CSV DELIMITER E'\\t' HEADER;" | ||
|
||
class ProteinProteinInteractionsDataPopulator(DataPopulator): | ||
def __init__(self, db_url): | ||
super().__init__(db_url) | ||
self.filepath = Constants.PROTEIN_PROTEIN_INTERACTIONS_DATA_FILEPATH | ||
|
||
def get_copy_statement(self): | ||
return f"COPY {Constants.PPI_DATABASE_NAMESPACE}.physical_interactions (protein1, protein2, interaction_detection_methods_identifier, experiment_name, time_stamp, source) FROM stdin WITH CSV DELIMITER E'\\t' HEADER;" | ||
|
||
class SourceDataPopulator(DataPopulator): | ||
def __init__(self, db_url, network_mode): | ||
super().__init__(db_url) | ||
self.network_mode = network_mode | ||
self.database_namespace = self.determine_database_namespace(network_mode) | ||
self.filepath = Constants.SOURCE_DATA_FILEPATH | ||
|
||
def get_copy_statement(self): | ||
return f"COPY {self.database_namespace}.source (time_stamp, source, display_name) FROM stdin WITH CSV DELIMITER E'\\t' HEADER;" |
Oops, something went wrong.