Skip to content

Commit

Permalink
remove repo metadata worker (#455)
Browse files Browse the repository at this point in the history
  • Loading branch information
JoinTyang authored Jan 2, 2025
1 parent 2a35ed8 commit 18ba73f
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 856 deletions.
2 changes: 2 additions & 0 deletions repo_metadata/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,3 +281,5 @@ def __init__(self):
FACES_TABLE = FacesTable('faces', '0001', '0004')

TAGS_TABLE = TagsTable('tags', '0002', '0003')

ZERO_OBJ_ID = '0000000000000000000000000000000000000000'
71 changes: 0 additions & 71 deletions repo_metadata/index_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,10 @@

from redis.exceptions import ConnectionError as NoMQAvailable, ResponseError, TimeoutError

from seafevents.repo_data import repo_data
from seafevents.mq import get_mq
from seafevents.utils import get_opt_from_conf_or_env
from seafevents.db import init_db_session_class
from seafevents.repo_metadata.metadata_server_api import MetadataServerAPI
from seafevents.repo_metadata.repo_metadata import RepoMetadata
from seafevents.repo_metadata.metadata_manager import MetadataManager
from seafevents.face_recognition.face_recognition_manager import FaceRecognitionManager


Expand Down Expand Up @@ -47,8 +44,6 @@ def __init__(self, config):
self._parse_config(config)

self.mq = get_mq(self.mq_server, self.mq_port, self.mq_password)
self.repo_metadata = RepoMetadata(self.metadata_server_api, self.mq)
self.metadata_manager = MetadataManager(self._db_session_class, self.repo_metadata)
self.face_recognition_manager = FaceRecognitionManager(config)
self.set_signal()

Expand Down Expand Up @@ -81,68 +76,10 @@ def tname(self):
return threading.current_thread().name

def start(self):
for i in range(int(self.worker_num)):
threading.Thread(target=self.worker_handler, name='subscribe_' + str(i), daemon=True).start()

for i in range(int(self.worker_num)):
threading.Thread(target=self.face_cluster_handler, name='face_cluster_' + str(i), daemon=True).start()
threading.Thread(target=self.refresh_lock, name='refresh_thread', daemon=True).start()

def worker_handler(self):
logger.info('%s starting update metadata work' % self.tname)
try:
while not self.should_stop.isSet():
try:
res = self.mq.brpop('metadata_task', timeout=30)
if res is not None:
key, value = res
msg = value.split('\t')
if len(msg) != 2:
logger.info('Bad message: %s' % str(msg))
else:
op_type, repo_id = msg[0], msg[1]
self.worker_task_handler(self.mq, repo_id, self.should_stop, op_type)
except (ResponseError, NoMQAvailable, TimeoutError) as e:
logger.error('The connection to the redis server failed: %s' % e)
except Exception as e:
logger.error('%s Handle Worker Task Error' % self.tname)
logger.error(e, exc_info=True)
# prevent case that redis break at program running.
time.sleep(0.3)

def worker_task_handler(self, mq, repo_id, should_stop, op_type):
# Python cannot kill threads, so stop it generate more locked key.
if not should_stop.isSet():
# set key-value if does not exist which will expire 30 minutes later
if mq.set(self._get_lock_key(repo_id), time.time(),
ex=self.LOCK_TIMEOUT, nx=True):
# get lock
logger.info('%s start updating repo %s' % (threading.currentThread().getName(), repo_id))
lock_key = self._get_lock_key(repo_id)
self.locked_keys.add(lock_key)
self.update_metadata(repo_id)
try:
self.locked_keys.remove(lock_key)
except KeyError:
logger.error("%s is already removed. SHOULD NOT HAPPEN!" % lock_key)
mq.delete(lock_key)
logger.info("%s Finish updating repo: %s, delete redis lock %s" %
(self.tname, repo_id, lock_key))
else:
# the repo is updated by other thread, push back to the queue
self.add_to_undo_task(mq, repo_id, op_type)

def update_metadata(self, repo_id):
commit_id = repo_data.get_repo_head_commit(repo_id)
if not commit_id:
# invalid repo without head commit id
logger.error("invalid repo : %s " % repo_id)
return
try:
self.metadata_manager.update_metadata(repo_id, commit_id)
except Exception as e:
logger.exception('update repo: %s metadata error: %s', repo_id, e)

def face_cluster_handler(self):
face_recognition_logger.info('%s starting face cluster' % self.tname)
try:
Expand Down Expand Up @@ -193,14 +130,6 @@ def update_face_cluster(self, repo_id, username):
except Exception as e:
face_recognition_logger.exception('update repo: %s metadata error: %s', repo_id, e)

def add_to_undo_task(self, mq, repo_id, op_type):
"""Push task back to the end of the queue.
"""
# avoid get the same task repeatedly
time.sleep(0.1)
mq.lpush('metadata_task', '\t'.join([op_type, repo_id]))
logger.debug('%s push back task (%s,) to the queue' % (self.tname, repo_id))

def refresh_lock(self):
logger.info('%s Starting refresh locks' % self.tname)
while True:
Expand Down
134 changes: 0 additions & 134 deletions repo_metadata/metadata_manager.py

This file was deleted.

Loading

0 comments on commit 18ba73f

Please sign in to comment.