From bdfd1e951bfc7b584c52768bb10d415a44ff5305 Mon Sep 17 00:00:00 2001 From: MoonBoi9001 Date: Fri, 10 Jan 2025 16:32:14 +0000 Subject: [PATCH] ruff check/format --- .../001_create_notifications_table.py | 12 +- services/poi_monitor/requirements-dev.txt | 3 +- services/poi_monitor/setup.py | 2 +- services/poi_monitor/src/__init__.py | 2 +- services/poi_monitor/src/analyzer.py | 227 ++++++++++-------- services/poi_monitor/src/database.py | 63 ++--- services/poi_monitor/src/migration.py | 32 +-- services/poi_monitor/src/monitor.py | 34 ++- services/poi_monitor/src/notification.py | 40 +-- services/poi_monitor/tests/test_analyzer.py | 78 +++--- services/poi_monitor/tests/test_database.py | 50 ++-- services/poi_monitor/tests/test_migration.py | 19 +- services/poi_monitor/tests/test_monitor.py | 63 ++--- .../poi_monitor/tests/test_notification.py | 45 ++-- 14 files changed, 360 insertions(+), 310 deletions(-) diff --git a/services/poi_monitor/migrations/001_create_notifications_table.py b/services/poi_monitor/migrations/001_create_notifications_table.py index edbf886..ca63f14 100644 --- a/services/poi_monitor/migrations/001_create_notifications_table.py +++ b/services/poi_monitor/migrations/001_create_notifications_table.py @@ -4,9 +4,10 @@ logger = logging.getLogger(__name__) + def migrate_up(conn: Optional[psycopg2.extensions.connection] = None) -> None: """Create the poi_notifications table.""" - + up_sql = """ CREATE TABLE IF NOT EXISTS poi_notifications ( id SERIAL PRIMARY KEY, @@ -20,7 +21,7 @@ def migrate_up(conn: Optional[psycopg2.extensions.connection] = None) -> None: CREATE INDEX IF NOT EXISTS idx_poi_notifications_sent_at ON poi_notifications(sent_at); """ - + try: with conn.cursor() as cur: cur.execute(up_sql) @@ -31,13 +32,14 @@ def migrate_up(conn: Optional[psycopg2.extensions.connection] = None) -> None: logger.error(f"Failed to create poi_notifications table: {str(e)}") raise + def migrate_down(conn: Optional[psycopg2.extensions.connection] = None) -> None: """Remove the poi_notifications table.""" - + down_sql = """ DROP TABLE IF EXISTS poi_notifications; """ - + try: with conn.cursor() as cur: cur.execute(down_sql) @@ -46,4 +48,4 @@ def migrate_down(conn: Optional[psycopg2.extensions.connection] = None) -> None: except Exception as e: conn.rollback() logger.error(f"Failed to drop poi_notifications table: {str(e)}") - raise \ No newline at end of file + raise diff --git a/services/poi_monitor/requirements-dev.txt b/services/poi_monitor/requirements-dev.txt index 6070917..ca770e4 100644 --- a/services/poi_monitor/requirements-dev.txt +++ b/services/poi_monitor/requirements-dev.txt @@ -1,3 +1,4 @@ pytest==8.0.0 pytest-cov==4.1.0 -pytest-mock==3.12.0 \ No newline at end of file +pytest-mock==3.12.0 +ruff==0.3.0 \ No newline at end of file diff --git a/services/poi_monitor/setup.py b/services/poi_monitor/setup.py index b69e48f..b0584c8 100644 --- a/services/poi_monitor/setup.py +++ b/services/poi_monitor/setup.py @@ -12,4 +12,4 @@ "pyyaml", "python-json-logger", ], -) \ No newline at end of file +) diff --git a/services/poi_monitor/src/__init__.py b/services/poi_monitor/src/__init__.py index c7d9a6d..7ff37fb 100644 --- a/services/poi_monitor/src/__init__.py +++ b/services/poi_monitor/src/__init__.py @@ -18,4 +18,4 @@ from .monitor import main __version__ = "0.1.0" -__all__ = ['PoiAnalyzer', 'Database', 'SlackNotifier', 'main'] +__all__ = ["PoiAnalyzer", "Database", "SlackNotifier", "main"] diff --git a/services/poi_monitor/src/analyzer.py b/services/poi_monitor/src/analyzer.py index 38819eb..9a96ac2 100644 --- a/services/poi_monitor/src/analyzer.py +++ b/services/poi_monitor/src/analyzer.py @@ -1,7 +1,6 @@ # Import external libraries import logging -from typing import Dict, Set, List, Optional, Tuple -from datetime import datetime, timedelta +from typing import Dict, Set, List, Optional import os import requests @@ -12,11 +11,12 @@ # Configure logging logger = logging.getLogger(__name__) + class PoiAnalyzer: """Class to analyze POI submissions and notify slack if discrepancies are found.""" - + def __init__(self, database: Database, notifier: SlackNotifier): - """Initialize the POI analyzer with a database and notifier. + """Initialize the POI analyzer with a database and notifier. These are internal modules that are injected into the class.""" self.db = database self.notifier = notifier @@ -24,7 +24,7 @@ def __init__(self, database: Database, notifier: SlackNotifier): def analyze_pois(self, deployment_id: str, block_number: int) -> Optional[Dict]: """Analyze POI submissions and detect discrepancies between indexers. - + This method checks if different indexers have submitted different POI (Proof of Indexing) hashes for the same deployment and block. A discrepancy indicates that indexers disagree about the correct POI value. @@ -38,7 +38,7 @@ def analyze_pois(self, deployment_id: str, block_number: int) -> Optional[Dict]: - Notification was already sent - No POI submissions exist - Only one unique POI hash exists (no discrepancy) - + Returns a dictionary containing discrepancy details if multiple different POI hashes are found, with structure: { @@ -54,46 +54,54 @@ def analyze_pois(self, deployment_id: str, block_number: int) -> Optional[Dict]: try: # Skip if we've already notified about this deployment/block if self.db.check_notification_sent(deployment_id, block_number): - logger.debug(f"Already notified about {deployment_id} at block {block_number}") + logger.debug( + f"Already notified about {deployment_id} at block {block_number}" + ) return None # Get all POI submissions for this deployment/block poi_submissions = self.db.get_latest_pois(deployment_id, block_number) - + if not poi_submissions: - logger.debug(f"No POI submissions found for {deployment_id} at block {block_number}") + logger.debug( + f"No POI submissions found for {deployment_id} at block {block_number}" + ) return None - + # If there's only one POI hash, there's no discrepancy if len(poi_submissions) == 1: - logger.debug(f"All indexers agree on POI for {deployment_id} at block {block_number}") + logger.debug( + f"All indexers agree on POI for {deployment_id} at block {block_number}" + ) return None - + # We have a discrepancy - format the data discrepancy_data = { - 'deployment_cid': deployment_id, - 'block_number': block_number, - 'submissions': poi_submissions, - 'reuse_info': self._check_poi_reuse(poi_submissions) + "deployment_cid": deployment_id, + "block_number": block_number, + "submissions": poi_submissions, + "reuse_info": self._check_poi_reuse(poi_submissions), } - + return discrepancy_data - + except Exception as e: logger.error(f"Error analyzing POIs: {str(e)}", exc_info=True) raise - def _check_poi_reuse(self, submissions: Dict[str, Set[str]]) -> Dict[str, List[str]]: + def _check_poi_reuse( + self, submissions: Dict[str, Set[str]] + ) -> Dict[str, List[str]]: """Check if any POIs have been reused from other blocks/deployments. - + Params: submissions: Dictionary mapping POI hashes to sets of indexer addresses - + Returns: Dict mapping POI hashes to lists of reuse information """ reuse_info = {} - + query = """ SELECT p.poi, @@ -110,55 +118,63 @@ def _check_poi_reuse(self, submissions: Dict[str, Set[str]]) -> Dict[str, List[s WHERE p.poi = ANY(%s) ORDER BY p.created_at DESC """ - + try: - with self.db.get_connection() as conn: - with conn.cursor() as cur: - poi_hashes = list(submissions.keys()) - cur.execute(query, (poi_hashes,)) - results = cur.fetchall() - - # Group results by POI hash - poi_occurrences = {} - for row in results: - (poi_hash, deployment_id, block_number, - indexer_addr, network, timestamp) = row - if poi_hash not in poi_occurrences: - poi_occurrences[poi_hash] = [] - poi_occurrences[poi_hash].append({ - 'deployment_id': deployment_id, - 'block_number': block_number, - 'indexer_address': indexer_addr, - 'network': network, - 'timestamp': timestamp - }) - - # Format detailed reuse information - for poi_hash, occurrences in poi_occurrences.items(): - if len(occurrences) > 1: # POI appears more than once - reuse_info[poi_hash] = [] - - # Sort by timestamp descending - occurrences.sort(key=lambda x: x['timestamp'], reverse=True) - - # First occurrence is current - current = occurrences[0] - - # Add details for each previous use - for prev in occurrences[1:]: - time_diff = current['timestamp'] - prev['timestamp'] - days_ago = time_diff.days - - reuse_info[poi_hash].append( - f"Previously used {days_ago} days ago:\n" - f"• Network: {prev['network']}\n" - f"• Deployment: {prev['deployment_id']}\n" - f"• Block: {prev['block_number']}\n" - f"• Indexer: {prev['indexer_address']}" - ) - - return reuse_info - + with self.db.get_connection() as conn: + with conn.cursor() as cur: + poi_hashes = list(submissions.keys()) + cur.execute(query, (poi_hashes,)) + results = cur.fetchall() + + # Group results by POI hash + poi_occurrences = {} + for row in results: + ( + poi_hash, + deployment_id, + block_number, + indexer_addr, + network, + timestamp, + ) = row + if poi_hash not in poi_occurrences: + poi_occurrences[poi_hash] = [] + poi_occurrences[poi_hash].append( + { + "deployment_id": deployment_id, + "block_number": block_number, + "indexer_address": indexer_addr, + "network": network, + "timestamp": timestamp, + } + ) + + # Format detailed reuse information + for poi_hash, occurrences in poi_occurrences.items(): + if len(occurrences) > 1: # POI appears more than once + reuse_info[poi_hash] = [] + + # Sort by timestamp descending + occurrences.sort(key=lambda x: x["timestamp"], reverse=True) + + # First occurrence is current + current = occurrences[0] + + # Add details for each previous use + for prev in occurrences[1:]: + time_diff = current["timestamp"] - prev["timestamp"] + days_ago = time_diff.days + + reuse_info[poi_hash].append( + f"Previously used {days_ago} days ago:\n" + f"• Network: {prev['network']}\n" + f"• Deployment: {prev['deployment_id']}\n" + f"• Block: {prev['block_number']}\n" + f"• Indexer: {prev['indexer_address']}" + ) + + return reuse_info + except Exception as e: logger.error(f"Error checking POI reuse: {str(e)}", exc_info=True) return {} @@ -167,38 +183,41 @@ def process_new_submissions(self) -> None: """Process any new POI submissions and send notifications for discrepancies.""" try: recent_submissions = self._get_recent_submissions() - + for deployment_id, block_number in recent_submissions: try: discrepancy = self.analyze_pois(deployment_id, block_number) - + if discrepancy: # Format and send notification - message = self.notifier.format_poi_discrepancy_message(discrepancy) + message = self.notifier.format_poi_discrepancy_message( + discrepancy + ) if self.notifier.send_notification(message): # Record that we sent the notification self.db.record_notification( - deployment_id, - block_number, - message + deployment_id, block_number, message ) except Exception as e: # Log the error but continue processing other submissions - logger.error(f"Error processing submission for deployment {deployment_id} at block {block_number}: {str(e)}", exc_info=True) + logger.error( + f"Error processing submission for deployment {deployment_id} at block {block_number}: {str(e)}", + exc_info=True, + ) continue - + # Cleanup old POI notifications from the database self.db.cleanup_old_notifications(days=60) - + except Exception as e: logger.error(f"Error processing submissions: {str(e)}", exc_info=True) raise def _get_recent_submissions(self) -> List[tuple[str, int]]: """Get list of recent deployment/block combinations to check.""" - graphql_url = os.getenv('GRAPHIX_API_URL', 'http://localhost:8000/graphql') + graphql_url = os.getenv("GRAPHIX_API_URL", "http://localhost:8000/graphql") submissions = set() - + query = """ query { poiAgreementRatios( @@ -219,43 +238,43 @@ def _get_recent_submissions(self) -> List[tuple[str, int]]: } } """ - + try: indexers = self._get_indexers() if not indexers: logger.error("No indexers found") return [] - + for indexer_address in indexers: logger.debug(f"Fetching POIs for indexer {indexer_address}") current_query = query % indexer_address - + response = requests.post( - graphql_url, - json={'query': current_query}, - timeout=10 + graphql_url, json={"query": current_query}, timeout=10 ) response.raise_for_status() data = response.json() - - if 'errors' in data: + + if "errors" in data: logger.error(f"GraphQL errors: {data['errors']}") break - - if 'data' not in data or 'poiAgreementRatios' not in data['data']: + + if "data" not in data or "poiAgreementRatios" not in data["data"]: logger.error("Unexpected GraphQL response format") break - + # Extract POIs from current page - agreements = data['data']['poiAgreementRatios'] + agreements = data["data"]["poiAgreementRatios"] for agreement in agreements: - submissions.add(( - agreement['poi']['deployment']['cid'], - agreement['poi']['block']['number'] - )) - + submissions.add( + ( + agreement["poi"]["deployment"]["cid"], + agreement["poi"]["block"]["number"], + ) + ) + return list(submissions) - + except requests.exceptions.RequestException as e: logger.error(f"Failed to fetch recent submissions: {str(e)}") return [] @@ -276,14 +295,12 @@ def _get_indexers(self) -> List[str]: current_query = query % self.page_size logger.debug(f"Fetching indexers with limit {self.page_size}") response = requests.post( - os.getenv('GRAPHIX_API_URL'), - json={'query': current_query}, - timeout=10 + os.getenv("GRAPHIX_API_URL"), json={"query": current_query}, timeout=10 ) data = response.json() - if 'data' in data and 'indexers' in data['data']: - indexers = data['data']['indexers'] - all_indexers.extend([indexer['address'] for indexer in indexers]) + if "data" in data and "indexers" in data["data"]: + indexers = data["data"]["indexers"] + all_indexers.extend([indexer["address"] for indexer in indexers]) return all_indexers return [] except Exception as e: diff --git a/services/poi_monitor/src/database.py b/services/poi_monitor/src/database.py index 14fe130..df4a4b0 100644 --- a/services/poi_monitor/src/database.py +++ b/services/poi_monitor/src/database.py @@ -1,7 +1,6 @@ import os import logging from typing import Dict, Set -import psycopg2 from psycopg2.pool import SimpleConnectionPool from contextlib import contextmanager from dotenv import load_dotenv @@ -9,6 +8,7 @@ logger = logging.getLogger(__name__) load_dotenv() + class Database: """ This class is the Database manager for POI monitoring. @@ -22,7 +22,7 @@ class Database: def __init__(self): """ Constructor initializes database connection and runs migrations. - + Raises: psycopg2.Error: If database connection cannot be established Exception: If migrations fail to apply @@ -31,13 +31,13 @@ def __init__(self): self.pool = SimpleConnectionPool( minconn=1, maxconn=10, - dbname=os.getenv('POSTGRES_DB', 'graphix'), - user=os.getenv('POSTGRES_USER', 'postgres'), - password=os.getenv('POSTGRES_PASSWORD', 'password'), - host=os.getenv('POSTGRES_HOST', 'localhost'), - port=os.getenv('POSTGRES_PORT', '5433') + dbname=os.getenv("POSTGRES_DB", "graphix"), + user=os.getenv("POSTGRES_USER", "postgres"), + password=os.getenv("POSTGRES_PASSWORD", "password"), + host=os.getenv("POSTGRES_HOST", "localhost"), + port=os.getenv("POSTGRES_PORT", "5433"), ) - + # Get initial connection for migrations with self.get_connection() as conn: self.conn = conn # Store temporary reference for migrations @@ -49,7 +49,7 @@ def get_connection(self): """ Get a database connection from the pool with automatic cleanup (this is faster than creating a new connetion each time we want to talk to the db) - + - Automatically returns connection to pool after use - Handles cleanup even if an exception occurs """ @@ -59,9 +59,11 @@ def get_connection(self): finally: self.pool.putconn(conn) - def get_latest_pois(self, deployment_id: str, block_number: int) -> Dict[str, Set[str]]: + def get_latest_pois( + self, deployment_id: str, block_number: int + ) -> Dict[str, Set[str]]: """Fetch all indexer POI submissions for a specific deployment and block. - + Retrieves all POI submissions and the indexers that submitted them for a given deployment at a specific block number. @@ -85,27 +87,27 @@ def get_latest_pois(self, deployment_id: str, block_number: int) -> Dict[str, Se JOIN sg_deployments d ON d.id = p.sg_deployment_id WHERE d.ipfs_cid = %s AND b.number = %s """ - + with self.get_connection() as conn: with conn.cursor() as cur: cur.execute(query, (deployment_id, block_number)) results = cur.fetchall() - + poi_submissions = {} for poi_hash, indexer_addr in results: if poi_hash not in poi_submissions: poi_submissions[poi_hash] = set() poi_submissions[poi_hash].add(indexer_addr) - + return poi_submissions def check_notification_sent(self, deployment_id: str, block_number: int) -> bool: """Check if we've already notified about the current set of POIs for this deployment/block. - + Params: deployment_id: The deployment CID block_number: The block number - + Returns: bool: True if we've already notified about these exact POIs """ @@ -126,21 +128,25 @@ def check_notification_sent(self, deployment_id: str, block_number: int) -> bool AND n.poi_set = c.poi_set ) """ - + with self.get_connection() as conn: with conn.cursor() as cur: - cur.execute(query, (deployment_id, block_number, deployment_id, block_number)) + cur.execute( + query, (deployment_id, block_number, deployment_id, block_number) + ) return cur.fetchone()[0] - def record_notification(self, deployment_id: str, block_number: int, message: str) -> None: + def record_notification( + self, deployment_id: str, block_number: int, message: str + ) -> None: """Record that a notification was sent. Later used to prevent duplicate notifications/spam. - + Params: deployment_id: The deployment IPFS hash block_number: The block number message: The notification message that was sent """ - + query = """ WITH current_pois AS ( SELECT array_agg(DISTINCT p.poi ORDER BY p.poi) as poi_set @@ -153,10 +159,13 @@ def record_notification(self, deployment_id: str, block_number: int, message: st SELECT %s, %s, %s, NOW(), c.poi_set::bytea[] FROM current_pois c """ - + with self.get_connection() as conn: with conn.cursor() as cur: - cur.execute(query, (deployment_id, block_number, deployment_id, block_number, message)) + cur.execute( + query, + (deployment_id, block_number, deployment_id, block_number, message), + ) conn.commit() def cleanup_old_notifications(self, days: int = 60) -> None: @@ -165,7 +174,7 @@ def cleanup_old_notifications(self, days: int = 60) -> None: DELETE FROM poi_notifications WHERE sent_at < NOW() - INTERVAL '%s days' """ - + with self.get_connection() as conn: with conn.cursor() as cur: cur.execute(query, (days,)) @@ -174,11 +183,11 @@ def cleanup_old_notifications(self, days: int = 60) -> None: def _run_migrations(self): """Run any pending database migrations.""" from .migration import MigrationManager - + try: manager = MigrationManager(self.conn) manager.apply_migrations() - + # Verify table structure with self.conn.cursor() as cur: cur.execute(""" @@ -188,7 +197,7 @@ def _run_migrations(self): """) columns = cur.fetchall() logger.info(f"poi_notifications table structure: {columns}") - + except Exception as e: logger.error(f"Failed to run migrations: {str(e)}") raise diff --git a/services/poi_monitor/src/migration.py b/services/poi_monitor/src/migration.py index b588fad..926e954 100644 --- a/services/poi_monitor/src/migration.py +++ b/services/poi_monitor/src/migration.py @@ -6,6 +6,7 @@ logger = logging.getLogger(__name__) + class MigrationManager: def __init__(self, conn: psycopg2.extensions.connection): self.conn = conn @@ -34,27 +35,30 @@ def get_applied_migrations(self) -> List[str]: def apply_migrations(self): """Apply all pending migrations.""" applied = set(self.get_applied_migrations()) - + # Get all migration files - migrations_dir = os.path.join(os.path.dirname(__file__), '..', 'migrations') + migrations_dir = os.path.join(os.path.dirname(__file__), "..", "migrations") logger.info(f"Looking for migrations in: {migrations_dir}") - migration_files = sorted([ - f for f in os.listdir(migrations_dir) - if (f.endswith('.py') and f != '__init__.py') or f.endswith('.sql') - ]) + migration_files = sorted( + [ + f + for f in os.listdir(migrations_dir) + if (f.endswith(".py") and f != "__init__.py") or f.endswith(".sql") + ] + ) logger.info(f"Found migration files: {migration_files}") for migration_file in migration_files: migration_name = migration_file[:-3] - + if migration_name in applied: logger.info(f"Skipping already applied migration: {migration_name}") continue logger.info(f"Applying migration: {migration_name}") - + try: - if migration_file.endswith('.sql'): + if migration_file.endswith(".sql"): # Handle SQL files with open(os.path.join(migrations_dir, migration_file)) as f: sql = f.read() @@ -65,18 +69,18 @@ def apply_migrations(self): # Import and run Python migrations module = importlib.import_module(f"migrations.{migration_name}") module.migrate_up(self.conn) - + # Record the migration with self.conn.cursor() as cur: cur.execute( "INSERT INTO poi_monitor_migrations (migration_name) VALUES (%s)", - (migration_name,) + (migration_name,), ) self.conn.commit() - + logger.info(f"Successfully applied migration: {migration_name}") - + except Exception as e: self.conn.rollback() logger.error(f"Failed to apply migration {migration_name}: {str(e)}") - raise \ No newline at end of file + raise diff --git a/services/poi_monitor/src/monitor.py b/services/poi_monitor/src/monitor.py index bc11c4f..9b6d3cd 100644 --- a/services/poi_monitor/src/monitor.py +++ b/services/poi_monitor/src/monitor.py @@ -12,32 +12,25 @@ # Configure logging logging_config = { - 'version': 1, - 'disable_existing_loggers': False, - 'formatters': { - 'json': { - '()': 'pythonjsonlogger.jsonlogger.JsonFormatter', - 'format': '%(asctime)s %(levelname)s %(name)s %(message)s' + "version": 1, + "disable_existing_loggers": False, + "formatters": { + "json": { + "()": "pythonjsonlogger.jsonlogger.JsonFormatter", + "format": "%(asctime)s %(levelname)s %(name)s %(message)s", } }, - 'handlers': { - 'console': { - 'class': 'logging.StreamHandler', - 'formatter': 'json' - } - }, - 'root': { - 'handlers': ['console'], - 'level': 'INFO' - } + "handlers": {"console": {"class": "logging.StreamHandler", "formatter": "json"}}, + "root": {"handlers": ["console"], "level": "INFO"}, } dictConfig(logging_config) logger = logging.getLogger(__name__) + def main(): logger.info("Starting POI Monitor service...") - + # Initialize components try: db = Database() @@ -47,18 +40,19 @@ def main(): except Exception as e: logger.error(f"Failed to initialize components: {str(e)}") raise - + try: while True: logger.info("Running POI check iteration") analyzer.process_new_submissions() - time.sleep(int(os.getenv('CHECK_INTERVAL', 300))) # Default 5 minutes - + time.sleep(int(os.getenv("CHECK_INTERVAL", 300))) # Default 5 minutes + except KeyboardInterrupt: logger.info("Shutting down POI Monitor service...") except Exception as e: logger.error(f"Unexpected error: {str(e)}", exc_info=True) raise + if __name__ == "__main__": main() diff --git a/services/poi_monitor/src/notification.py b/services/poi_monitor/src/notification.py index eb8f0a5..dfeb833 100644 --- a/services/poi_monitor/src/notification.py +++ b/services/poi_monitor/src/notification.py @@ -5,51 +5,51 @@ logger = logging.getLogger(__name__) + class SlackNotifier: """This class is used to send notifications to a Slack channel.""" + def __init__(self, webhook_url: str = None): """ Initializes the SlackNotifier with a webhook URL. - + Params: webhook_url: The Slack webhook URL Raises: ValueError: If the Slack webhook URL is not provided """ - self.webhook_url = webhook_url or os.getenv('SLACK_WEBHOOK_URL') + self.webhook_url = webhook_url or os.getenv("SLACK_WEBHOOK_URL") if not self.webhook_url: raise ValueError("Slack webhook URL not provided") def send_notification(self, message: str) -> bool: """Send a notification to Slack. - + Params: message: The formatted message to send - + Returns: bool: True if the message was sent successfully """ try: response = requests.post( - self.webhook_url, - json={"text": message}, - timeout=10 + self.webhook_url, json={"text": message}, timeout=10 ) response.raise_for_status() logger.info("Successfully sent Slack notification") return True - + except requests.exceptions.RequestException as e: logger.error(f"Failed to send Slack notification: {str(e)}") return False def format_poi_discrepancy_message(self, data: Dict[str, Any]) -> str: """Format POI discrepancy data into a Slack message. - + Params: data: Dictionary containing POI discrepancy information - + Returns: str: Formatted message ready to send to Slack """ @@ -57,23 +57,23 @@ def format_poi_discrepancy_message(self, data: Dict[str, Any]) -> str: "🚨 *New POI Discrepancy Found*", f"*Deployment:* `{data['deployment_cid']}`", f"*Block:* `{data['block_number']}`", - "*POI Submissions:*" + "*POI Submissions:*", ] - for poi_hash, indexers in data['submissions'].items(): + for poi_hash, indexers in data["submissions"].items(): # Convert memoryview to hex string if needed if isinstance(poi_hash, memoryview): poi_hash = poi_hash.hex() - + submission_parts = [f"*POI Hash:* `{poi_hash}`"] - + # Add POI reuse information if available - if 'reuse_info' in data and poi_hash in data['reuse_info']: - reuse_data = data['reuse_info'][poi_hash] + if "reuse_info" in data and poi_hash in data["reuse_info"]: + reuse_data = data["reuse_info"][poi_hash] submission_parts.append("⚠️ *POI Reuse:*") for detail in reuse_data: submission_parts.append(f" • {detail}") - + # Convert indexer addresses if they're memoryview formatted_indexers = [] for indexer in indexers: @@ -81,8 +81,10 @@ def format_poi_discrepancy_message(self, data: Dict[str, Any]) -> str: formatted_indexers.append(indexer.hex()) else: formatted_indexers.append(str(indexer)) - - submission_parts.append(f"*Submitted by:* `{', '.join(sorted(formatted_indexers))}`") + + submission_parts.append( + f"*Submitted by:* `{', '.join(sorted(formatted_indexers))}`" + ) message_parts.extend(submission_parts) message_parts.append("") # Add spacing between submissions diff --git a/services/poi_monitor/tests/test_analyzer.py b/services/poi_monitor/tests/test_analyzer.py index 10a73dc..492565c 100644 --- a/services/poi_monitor/tests/test_analyzer.py +++ b/services/poi_monitor/tests/test_analyzer.py @@ -6,6 +6,7 @@ from src.notification import SlackNotifier import requests + @pytest.fixture def mock_db(): db = Mock(spec=Database) @@ -13,71 +14,73 @@ def mock_db(): context_mock = MagicMock() mock_conn = MagicMock() mock_cursor = MagicMock() - + # Set up the context manager chain db.get_connection.return_value = context_mock context_mock.__enter__.return_value = mock_conn mock_conn.cursor.return_value.__enter__.return_value = mock_cursor - + # Store cursor on db for tests that need it db._test_cursor = mock_cursor return db # Return just the db, not the tuple + @pytest.fixture def mock_notifier(): return Mock(spec=SlackNotifier) + @pytest.fixture def analyzer(mock_db, mock_notifier): return PoiAnalyzer(mock_db, mock_notifier) + def test_analyze_pois_no_discrepancy(analyzer, mock_db): # Setup deployment_id = "Qm123" block_number = 1000 - + # Mock database responses mock_db.check_notification_sent.return_value = False mock_db.get_latest_pois.return_value = { "poi_hash_1": {"indexer1", "indexer2"} # Single POI hash = no discrepancy } - + # Execute result = analyzer.analyze_pois(deployment_id, block_number) - + # Assert assert result is None mock_db.check_notification_sent.assert_called_once_with(deployment_id, block_number) mock_db.get_latest_pois.assert_called_once_with(deployment_id, block_number) + def test_analyze_pois_with_discrepancy(analyzer, mock_db): # Setup deployment_id = "Qm123" block_number = 1000 - + # Mock database responses mock_db.check_notification_sent.return_value = False mock_db.get_latest_pois.return_value = { "poi_hash_1": {"indexer1"}, - "poi_hash_2": {"indexer2"} # Two different POI hashes = discrepancy + "poi_hash_2": {"indexer2"}, # Two different POI hashes = discrepancy } - + # Execute result = analyzer.analyze_pois(deployment_id, block_number) - + # Assert assert result is not None assert result["deployment_cid"] == deployment_id assert result["block_number"] == block_number assert result["submissions"] == mock_db.get_latest_pois.return_value + def test_check_poi_reuse(analyzer): """Test POI reuse detection.""" # Setup - submissions = { - "poi_hash_1": {"indexer1"}, - "poi_hash_2": {"indexer2"} - } + submissions = {"poi_hash_1": {"indexer1"}, "poi_hash_2": {"indexer2"}} # Mock database responses mock_cursor = analyzer.db._test_cursor @@ -85,7 +88,7 @@ def test_check_poi_reuse(analyzer): # Match the columns from the query: # poi, deployment_id, block_number, indexer_address, network_name, submission_time ("poi_hash_1", "deployment1", 1000, b"addr1", "mainnet", datetime.now()), - ("poi_hash_1", "deployment2", 900, b"addr2", "mainnet", datetime.now()) + ("poi_hash_1", "deployment2", 900, b"addr2", "mainnet", datetime.now()), ] # Execute @@ -96,79 +99,80 @@ def test_check_poi_reuse(analyzer): assert len(result["poi_hash_1"]) == 1 assert "Previously used" in result["poi_hash_1"][0] + def test_analyze_pois_already_notified(analyzer, mock_db): """Test that we don't re-notify about known discrepancies.""" deployment_id = "Qm123" block_number = 1000 - + mock_db.check_notification_sent.return_value = True - + result = analyzer.analyze_pois(deployment_id, block_number) assert result is None mock_db.get_latest_pois.assert_not_called() + def test_analyze_pois_no_submissions(analyzer, mock_db): """Test handling of blocks with no POI submissions.""" deployment_id = "Qm123" block_number = 1000 - + mock_db.check_notification_sent.return_value = False mock_db.get_latest_pois.return_value = {} - + result = analyzer.analyze_pois(deployment_id, block_number) assert result is None + def test_process_new_submissions_handles_errors(analyzer, mock_db): """Test error handling in the main processing loop.""" # Mock _get_recent_submissions to return some test data - analyzer._get_recent_submissions = Mock(return_value=[ - ("Qm123", 1000), - ("Qm456", 2000) - ]) - + analyzer._get_recent_submissions = Mock( + return_value=[("Qm123", 1000), ("Qm456", 2000)] + ) + # Make analyze_pois raise an exception for the second submission def mock_analyze(deployment_id, block_number): if deployment_id == "Qm456": raise Exception("Test error") return None - + analyzer.analyze_pois = Mock(side_effect=mock_analyze) mock_db.cleanup_old_notifications = Mock() - + # This should not raise an exception and should continue processing analyzer.process_new_submissions() - + # Verify we tried to process both submissions assert analyzer.analyze_pois.call_count == 2 # Verify cleanup was still called mock_db.cleanup_old_notifications.assert_called_once() + def test_get_recent_submissions_handles_api_errors(analyzer): """Test handling of GraphQL API errors.""" - with patch('requests.post') as mock_post: + with patch("requests.post") as mock_post: # Mock a failed API response mock_post.side_effect = requests.exceptions.RequestException("API Error") - + result = analyzer._get_recent_submissions() assert result == [] # Should return empty list on error + def test_check_poi_reuse_with_multiple_reuses(analyzer): """Test POI reuse detection with multiple reuse patterns.""" - submissions = { - "poi_hash_1": {"indexer1"}, - "poi_hash_2": {"indexer2"} - } - + submissions = {"poi_hash_1": {"indexer1"}, "poi_hash_2": {"indexer2"}} + now = datetime.now() - + # Mock database responses mock_cursor = analyzer.db._test_cursor mock_cursor.fetchall.return_value = [ ("poi_hash_1", "deployment1", 1000, b"addr1", "mainnet", now), ("poi_hash_1", "deployment2", 900, b"addr2", "mainnet", now), ("poi_hash_2", "deployment1", 1000, b"addr1", "mainnet", now), - ("poi_hash_2", "deployment1", 950, b"addr1", "mainnet", now) + ("poi_hash_2", "deployment1", 950, b"addr1", "mainnet", now), ] - + result = analyzer._check_poi_reuse(submissions) - assert len(result) == 2 # Both POIs were reused \ No newline at end of file + assert len(result) == 2 # Both POIs were reused diff --git a/services/poi_monitor/tests/test_database.py b/services/poi_monitor/tests/test_database.py index e3e2ff6..14cabd6 100644 --- a/services/poi_monitor/tests/test_database.py +++ b/services/poi_monitor/tests/test_database.py @@ -1,83 +1,88 @@ import pytest -from datetime import datetime from unittest.mock import Mock, MagicMock, patch from src.database import Database import psycopg2 + @pytest.fixture def mock_conn(): conn = MagicMock() - with patch('psycopg2.connect', return_value=conn): + with patch("psycopg2.connect", return_value=conn): yield conn + @pytest.fixture def database(mock_conn): """Create a Database instance with mocked connection.""" - with patch('psycopg2.pool.SimpleConnectionPool') as mock_pool: + with patch("psycopg2.pool.SimpleConnectionPool") as mock_pool: # Mock the connection pool pool = MagicMock() pool.getconn.return_value = mock_conn mock_pool.return_value = pool - + # Mock migrations - with patch('src.migration.MigrationManager') as mock_manager: + with patch("src.migration.MigrationManager") as mock_manager: mock_manager_instance = Mock() mock_manager.return_value = mock_manager_instance - + db = Database() return db + def test_database_connection_retry(mock_conn): """Test that database connection retries on failure.""" - with patch('psycopg2.pool.SimpleConnectionPool') as mock_pool: + with patch("psycopg2.pool.SimpleConnectionPool") as mock_pool: # Make pool creation fail twice then succeed mock_pool.side_effect = [ psycopg2.Error("Test error"), psycopg2.Error("Test error"), - MagicMock() # Successful pool + MagicMock(), # Successful pool ] - + # Mock migrations - with patch('src.migration.MigrationManager') as mock_manager: + with patch("src.migration.MigrationManager") as mock_manager: mock_manager_instance = Mock() mock_manager.return_value = mock_manager_instance - with patch('time.sleep'): # Don't actually sleep in tests + with patch("time.sleep"): # Don't actually sleep in tests db = Database() # Verify pool was created assert db.pool is not None + def test_get_latest_pois(database, mock_conn): """Test fetching latest POI submissions.""" mock_cursor = MagicMock() mock_cursor.fetchall.return_value = [ ("poi1", "indexer1"), ("poi1", "indexer2"), - ("poi2", "indexer3") + ("poi2", "indexer3"), ] mock_conn.cursor.return_value.__enter__.return_value = mock_cursor - + result = database.get_latest_pois("deployment1", 1000) - + assert len(result) == 2 # Two unique POIs assert result["poi1"] == {"indexer1", "indexer2"} assert result["poi2"] == {"indexer3"} + def test_check_notification_sent(database, mock_conn): """Test checking if notification was already sent.""" mock_cursor = MagicMock() mock_cursor.fetchone.return_value = (True,) mock_conn.cursor.return_value.__enter__.return_value = mock_cursor - + result = database.check_notification_sent("deployment1", 1000) assert result is True + def test_record_notification(database, mock_conn): """Test recording a notification.""" mock_cursor = MagicMock() mock_conn.cursor.return_value.__enter__.return_value = mock_cursor - + database.record_notification("deployment1", 1000, "test message") - + # Verify the INSERT query was executed with correct parameters mock_cursor.execute.assert_any_call( """ @@ -92,23 +97,24 @@ def test_record_notification(database, mock_conn): SELECT %s, %s, %s, NOW(), c.poi_set::bytea[] FROM current_pois c """, - ("deployment1", 1000, "deployment1", 1000, "test message") + ("deployment1", 1000, "deployment1", 1000, "test message"), ) mock_conn.commit.assert_called_once() + def test_cleanup_old_notifications(database, mock_conn): """Test cleaning up old notifications.""" mock_cursor = MagicMock() mock_conn.cursor.return_value.__enter__.return_value = mock_cursor - + database.cleanup_old_notifications(days=30) - + # Verify the DELETE query was executed with correct parameters mock_cursor.execute.assert_any_call( """ DELETE FROM poi_notifications WHERE sent_at < NOW() - INTERVAL '%s days' """, - (30,) + (30,), ) - mock_conn.commit.assert_called_once() \ No newline at end of file + mock_conn.commit.assert_called_once() diff --git a/services/poi_monitor/tests/test_migration.py b/services/poi_monitor/tests/test_migration.py index 3160c7a..96c71fb 100644 --- a/services/poi_monitor/tests/test_migration.py +++ b/services/poi_monitor/tests/test_migration.py @@ -1,38 +1,41 @@ import pytest -from unittest.mock import Mock, MagicMock, patch +from unittest.mock import MagicMock from src.migration import MigrationManager -import os + @pytest.fixture def mock_conn(): return MagicMock() + @pytest.fixture def manager(mock_conn): return MigrationManager(mock_conn) + def test_ensure_migration_table(manager, mock_conn): """Test migration table creation.""" mock_cursor = MagicMock() mock_conn.cursor.return_value.__enter__.return_value = mock_cursor - + # Reset the mock to clear any previous calls mock_conn.commit.reset_mock() - + manager._ensure_migration_table() - + mock_cursor.execute.assert_called_once() mock_conn.commit.assert_called_once() + def test_get_applied_migrations(manager, mock_conn): """Test fetching applied migrations.""" mock_cursor = MagicMock() mock_cursor.fetchall.return_value = [ ("001_create_notifications_table",), - ("002_another_migration",) + ("002_another_migration",), ] mock_conn.cursor.return_value.__enter__.return_value = mock_cursor - + result = manager.get_applied_migrations() assert len(result) == 2 - assert "001_create_notifications_table" in result \ No newline at end of file + assert "001_create_notifications_table" in result diff --git a/services/poi_monitor/tests/test_monitor.py b/services/poi_monitor/tests/test_monitor.py index 875c9f2..3201f2d 100644 --- a/services/poi_monitor/tests/test_monitor.py +++ b/services/poi_monitor/tests/test_monitor.py @@ -5,65 +5,74 @@ from src.notification import SlackNotifier from src.analyzer import PoiAnalyzer + def test_main_initializes_components(): """Test that main properly initializes all components.""" - with patch('src.monitor.Database') as mock_db_class, \ - patch('src.monitor.SlackNotifier') as mock_notifier_class, \ - patch('src.monitor.PoiAnalyzer') as mock_analyzer_class, \ - patch('src.monitor.time.sleep', side_effect=KeyboardInterrupt): # Break the loop - + with patch("src.monitor.Database") as mock_db_class, patch( + "src.monitor.SlackNotifier" + ) as mock_notifier_class, patch( + "src.monitor.PoiAnalyzer" + ) as mock_analyzer_class, patch( + "src.monitor.time.sleep", side_effect=KeyboardInterrupt + ): # Break the loop # Setup mocks mock_db = Mock(spec=Database) mock_notifier = Mock(spec=SlackNotifier) mock_analyzer = Mock(spec=PoiAnalyzer) - + mock_db_class.return_value = mock_db mock_notifier_class.return_value = mock_notifier mock_analyzer_class.return_value = mock_analyzer - + # Run main (will be interrupted by KeyboardInterrupt) main() - + # Verify components were initialized mock_db_class.assert_called_once() mock_notifier_class.assert_called_once() mock_analyzer_class.assert_called_once_with(mock_db, mock_notifier) + def test_main_handles_initialization_error(): """Test that main properly handles initialization errors.""" - with patch('src.monitor.Database', side_effect=Exception("Test error")), \ - patch('src.monitor.logger') as mock_logger: - + with patch("src.monitor.Database", side_effect=Exception("Test error")), patch( + "src.monitor.logger" + ) as mock_logger: with pytest.raises(Exception): main() - + mock_logger.error.assert_called_once() + def test_main_processes_submissions(): """Test that main calls process_new_submissions.""" - with patch('src.monitor.Database') as mock_db_class, \ - patch('src.monitor.SlackNotifier') as mock_notifier_class, \ - patch('src.monitor.PoiAnalyzer') as mock_analyzer_class, \ - patch('src.monitor.time.sleep', side_effect=KeyboardInterrupt): # Stop after first run - + with patch("src.monitor.Database"), patch( + "src.monitor.SlackNotifier" + ), patch( + "src.monitor.PoiAnalyzer" + ) as mock_analyzer_class, patch( + "src.monitor.time.sleep", side_effect=KeyboardInterrupt + ): # Stop after first run # Setup mocks mock_analyzer = Mock(spec=PoiAnalyzer) mock_analyzer_class.return_value = mock_analyzer - + # Run main main() - + # Verify process_new_submissions was called assert mock_analyzer.process_new_submissions.call_count == 1 + def test_main_handles_keyboard_interrupt(): """Test that main handles keyboard interrupt gracefully.""" - with patch('src.monitor.Database') as mock_db_class, \ - patch('src.monitor.SlackNotifier') as mock_notifier_class, \ - patch('src.monitor.PoiAnalyzer') as mock_analyzer_class, \ - patch('src.monitor.time.sleep', side_effect=KeyboardInterrupt), \ - patch('src.monitor.logger') as mock_logger: - + with patch("src.monitor.Database"), patch( + "src.monitor.SlackNotifier" + ) as _, patch( + "src.monitor.PoiAnalyzer" + ) as _, patch( + "src.monitor.time.sleep", side_effect=KeyboardInterrupt + ), patch("src.monitor.logger") as mock_logger: main() - - mock_logger.info.assert_any_call("Shutting down POI Monitor service...") \ No newline at end of file + + mock_logger.info.assert_any_call("Shutting down POI Monitor service...") diff --git a/services/poi_monitor/tests/test_notification.py b/services/poi_monitor/tests/test_notification.py index bfe7791..470f596 100644 --- a/services/poi_monitor/tests/test_notification.py +++ b/services/poi_monitor/tests/test_notification.py @@ -1,49 +1,48 @@ import pytest -from unittest.mock import Mock, patch +from unittest.mock import patch from src.notification import SlackNotifier import requests + @pytest.fixture def notifier(): - with patch.dict('os.environ', {'SLACK_WEBHOOK_URL': 'http://test.url'}): + with patch.dict("os.environ", {"SLACK_WEBHOOK_URL": "http://test.url"}): return SlackNotifier() + def test_send_notification_success(notifier): """Test successful notification sending.""" - with patch('requests.post') as mock_post: + with patch("requests.post") as mock_post: mock_post.return_value.raise_for_status.return_value = None - + result = notifier.send_notification("test message") - + assert result is True mock_post.assert_called_once() + def test_send_notification_failure(notifier): """Test handling of notification failure.""" - with patch('requests.post') as mock_post: + with patch("requests.post") as mock_post: mock_post.side_effect = requests.exceptions.RequestException("Test error") - + result = notifier.send_notification("test message") - + assert result is False + def test_format_poi_discrepancy_message(notifier): """Test message formatting.""" data = { - 'deployment_cid': 'Qm123', - 'block_number': 1000, - 'submissions': { - 'poi1': {'indexer1', 'indexer2'}, - 'poi2': {'indexer3'} - }, - 'reuse_info': { - 'poi2': ['Previously used in deployment X'] - } + "deployment_cid": "Qm123", + "block_number": 1000, + "submissions": {"poi1": {"indexer1", "indexer2"}, "poi2": {"indexer3"}}, + "reuse_info": {"poi2": ["Previously used in deployment X"]}, } - + message = notifier.format_poi_discrepancy_message(data) - - assert '🚨' in message - assert 'Qm123' in message - assert 'indexer1' in message - assert 'Previously used in deployment X' in message \ No newline at end of file + + assert "🚨" in message + assert "Qm123" in message + assert "indexer1" in message + assert "Previously used in deployment X" in message