Skip to content

Commit

Permalink
chore: auto restarting to fix final remaining memory issues
Browse files Browse the repository at this point in the history
  • Loading branch information
xadahiya committed Oct 23, 2024
1 parent b32d9cc commit 7337a7e
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 6 deletions.
4 changes: 3 additions & 1 deletion pm2.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ module.exports = {
out_file: "/dev/null",
env: {
NODE_ENV: NODE_ENV,
}
},
cron_restart: "0 * * * *",
autorestart: true
},
]
}
27 changes: 22 additions & 5 deletions snapshotter/system_event_detector.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from eth_utils.address import to_checksum_address
from web3 import Web3
import sys
import os
from snapshotter.processor_distributor import ProcessorDistributor
from snapshotter.settings.config import settings
from snapshotter.utils.callback_helpers import send_telegram_notification_sync
Expand Down Expand Up @@ -117,11 +118,13 @@ def __init__(self, name, **kwargs):
self._initialized = False

async def init(self):
self._logger.info('Initializing SystemEventDetector. Awaiting local collector initialization and bootstrapping for 60 seconds...')
await asyncio.sleep(15)
self._last_processed_block = await self._load_last_processed_block()
await self.processor_distributor.init()
await self._init_check_and_report()
await asyncio.sleep(15)
if self._last_processed_block is None:
self._logger.info('Initializing SystemEventDetector. Awaiting local collector initialization and bootstrapping for 60 seconds...')
await asyncio.sleep(15)
await self._init_check_and_report()
await asyncio.sleep(15)

async def _init_check_and_report(self):
try:
Expand Down Expand Up @@ -230,6 +233,17 @@ def _generic_exit_handler(self, signum, sigframe):
self._shutdown_initiated = True
raise GenericExitOnSignal

async def _save_last_processed_block(self):
with open("last_processed_block.log", 'w') as f:
f.write(str(self._last_processed_block))

async def _load_last_processed_block(self):
# check if last_processed_block.log exists
if os.path.exists("last_processed_block.log"):
with open("last_processed_block.log", 'r') as f:
return int(f.read())
return None

async def _detect_events(self):
"""
Continuously detects events by fetching the current block and comparing it to the last processed block.
Expand Down Expand Up @@ -279,6 +293,8 @@ async def _detect_events(self):

if not self._last_processed_block:
self._last_processed_block = current_block - 1
# save it to last_processed_block file
await self._save_last_processed_block()

if self._last_processed_block >= current_block:
self._logger.info(
Expand All @@ -294,6 +310,7 @@ async def _detect_events(self):
'processing current block',
)
self._last_processed_block = current_block - 10
await self._save_last_processed_block()

# Get events from current block to last_processed_block
try:
Expand Down Expand Up @@ -330,7 +347,7 @@ async def _detect_events(self):
)

self._last_processed_block = current_block

await self._save_last_processed_block()
self._logger.info(
'DONE: Processed blocks till {}',
current_block,
Expand Down

0 comments on commit 7337a7e

Please sign in to comment.