Skip to content

Commit

Permalink
Feature/add enable mulitprocess (#842)
Browse files Browse the repository at this point in the history
* change post processing

* version

* add report in eventserver

* fix bug

* fix unit test

* add enable multiprocess

* fix bug

* fix bug

* fix e2e and bug

* fix e2e

* fix e2e

* dockerfile

* add config

* remove usless code

* add report

* bugfix

* fixbug

* reportor add threadpool

* fix activate_group

* fix activate_group

* 1.add redishook 2.add dm_group patch 3.add signal IGN in processserver

* supprt python38 syncmanager

* 1

* review

* dockerfile

* version

* recover pip install image source

* deepcopy in channels, origin in any

* change deepcopy only in any channel

* recover  deepcopy

---------

Co-authored-by: wujiasheng03 <[email protected]>
  • Loading branch information
noO0oOo0ob and wujiasheng03 authored May 20, 2024
1 parent 73afd80 commit 3b11287
Show file tree
Hide file tree
Showing 34 changed files with 1,181 additions and 208 deletions.
6 changes: 3 additions & 3 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ WORKDIR /usr/src
COPY --from=nodebuilder /usr/src/lyrebird/client/ /usr/src/lyrebird/client/
RUN if [[ -n "$USE_MIRROR" ]] ; then sed -i 's/dl-cdn.alpinelinux.org/mirrors.ustc.edu.cn/g' /etc/apk/repositories ; fi \
&& apk update \
&& apk add --no-cache build-base jpeg-dev zlib-dev libffi-dev openssl-dev \
&& if [[ -n "$USE_MIRROR" ]] ; then pip install --upgrade pip && pip install --no-cache-dir . facebook-wda==0.8.1 jsonschema -i https://pypi.douban.com/simple ; else pip install --upgrade pip && pip install --no-cache-dir . facebook-wda==0.8.1 jsonschema ; fi \
&& pip install werkzeug==2.2.2 mitmproxy -t /usr/local/mitmenv \
&& apk add --no-cache build-base jpeg-dev zlib-dev libffi-dev openssl-dev redis \
&& if [[ -n "$USE_MIRROR" ]] ; then pip install --upgrade pip -i https://pypi.douban.com/simple && pip install --no-cache-dir . facebook-wda==0.8.1 jsonschema redis -i https://pypi.douban.com/simple ; else pip install --upgrade pip && pip install --no-cache-dir . facebook-wda==0.8.1 jsonschema redis ; fi \
&& if [[ -n "$USE_MIRROR" ]] ; then pip install werkzeug==2.2.2 mitmproxy -t /usr/local/mitmenv -i https://pypi.douban.com/simple ; else pip install werkzeug==2.2.2 mitmproxy -t /usr/local/mitmenv ; fi \
&& rm -rf /usr/src \
&& apk del --purge build-base jpeg-dev zlib-dev libffi-dev openssl-dev

Expand Down
7 changes: 4 additions & 3 deletions e2e_tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import sys
import signal
import pytest
import time
Expand Down Expand Up @@ -89,7 +90,7 @@ def _find_free_port(self):

def start(self, checker_path=[]):

cmdline = f'python3 -m lyrebird -b -v --no-mitm --mock {self.port} --extra-mock {self.extra_mock_port}'
cmdline = f'{sys.executable} -m lyrebird -b -v --no-mitm --mock {self.port} --extra-mock {self.extra_mock_port}'
for path in checker_path:
cmdline = cmdline + f' --script {path}'
self.lyrebird_process = subprocess.Popen(cmdline, shell=True, start_new_session=True)
Expand All @@ -103,9 +104,9 @@ def start(self, checker_path=[]):
def stop(self):
if self.lyrebird_process:
try:
os.killpg(self.lyrebird_process.pid, signal.SIGTERM)
os.killpg(self.lyrebird_process.pid, signal.SIGINT)
except PermissionError:
os.kill(self.lyrebird_process.pid, signal.SIGTERM)
os.kill(self.lyrebird_process.pid, signal.SIGINT)
_wait_exception(requests.get, args=[self.api_status])
self.lyrebird_process = None

Expand Down
9 changes: 7 additions & 2 deletions lyrebird/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def emit(event, *args, **kwargs):
context.application.socket_io.emit(event, *args, **kwargs)


def subscribe(channel, func, *args, **kwargs):
def subscribe(channel, func, name='', *args, **kwargs):
"""
订阅信号
Expand All @@ -42,7 +42,12 @@ def subscribe(channel, func, *args, **kwargs):
:param sender: 信号发送者标识
"""
# context.application.event_bus.subscribe(channel, func)
application.server['event'].subscribe(channel, func, *args, **kwargs)
func_info = {
'name': name,
'channel': channel,
'func': func
}
application.server['event'].subscribe(func_info, *args, **kwargs)


def publish(channel, event, *args, **kwargs):
Expand Down
83 changes: 81 additions & 2 deletions lyrebird/application.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import webbrowser

import multiprocessing
from queue import Queue
from flask import jsonify
from functools import reduce

Expand Down Expand Up @@ -75,6 +77,74 @@ def start_server():
def stop_server():
for name in server:
server[name].stop()
sync_manager.broadcast_to_queues(None)

def terminate_server():
for name in server:
server[name].terminate()


class SyncManager():
def __init__(self) -> None:
global sync_namespace
self.manager = multiprocessing.Manager()
self.async_objs = {
'manager_queues': [],
'multiprocessing_queues': [],
'namespace': [],
'locks': []
}
sync_namespace = self.get_namespace()

def get_namespace(self):
namespace = self.manager.Namespace()
self.async_objs['namespace'].append(namespace)
return namespace

def get_queue(self):
queue = self.manager.Queue()
self.async_objs['manager_queues'].append(queue)
return queue

def get_thread_queue(self):
queue = Queue()
return queue

def get_multiprocessing_queue(self):
queue = multiprocessing.Queue()
self.async_objs['multiprocessing_queues'].append(queue)
return queue

def get_lock(self):
lock = multiprocessing.Lock()
self.async_objs['locks'].append(lock)
return lock

def broadcast_to_queues(self, msg):
for q in self.async_objs['multiprocessing_queues']:
q.put(msg)
for q in self.async_objs['manager_queues']:
q.put(msg)

def destory(self):
for q in self.async_objs['multiprocessing_queues']:
q.close()
del q
for q in self.async_objs['manager_queues']:
q._close()
del q
for ns in self.async_objs['namespace']:
del ns
for lock in self.async_objs['locks']:
del lock
self.manager.shutdown()
self.manager.join()
self.manager = None
self.async_objs = None


sync_manager = {}
sync_namespace = {}


class ConfigProxy:
Expand All @@ -89,7 +159,10 @@ def __getitem__(self, k):
return _cm.config[k]

def raw(self):
return _cm.config
if hasattr(_cm.config, 'raw'):
return _cm.config.raw()
else:
return _cm.config


config = ConfigProxy()
Expand Down Expand Up @@ -151,6 +224,8 @@ def status_listener(event):
module_status = system.get('status')
if module_status == 'READY':
status_checkpoints[module] = True
else:
status_checkpoints[module] = False

is_all_status_checkpoints_ok = reduce(lambda x, y: x and y, status_checkpoints.values())
if is_all_status_checkpoints_ok:
Expand All @@ -165,7 +240,11 @@ def status_listener(event):
webbrowser.open(f'http://localhost:{config["mock.port"]}')

def process_status_listener():
server['event'].subscribe('system', status_listener)
server['event'].subscribe({
'name': 'status_listener',
'channel': 'system',
'func': status_listener
})


def status_ready():
Expand Down
109 changes: 77 additions & 32 deletions lyrebird/base_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,21 @@
Base threading server class
"""

import inspect
from threading import Thread
from multiprocessing import Process, Queue
from multiprocessing import Process
from lyrebird import application


service_msg_queue = Queue()
def check_process_server_run_function_compatibility(function):
# Check whether the run method is an old or new version by params.
if len(inspect.signature(function).parameters) == 4 and list(inspect.signature(function).parameters.keys())[0] == 'async_obj':
return True
else:
return False


service_msg_queue = None


class ProcessServer:
Expand All @@ -16,54 +25,80 @@ def __init__(self):
self.running = False
self.name = None
self.event_thread = None
self.async_obj = {}
self.args = []
self.kwargs = {}

def run(self, msg_queue, config, log_queue, *args, **kwargs):
def run(self, async_obj, config, *args, **kwargs):
'''
msg_queue
message queue for process server and main process
#1. Send event to main process,
{
"type": "event",
"channel": "",
"content": {}
}
#2. Send message to frontend
support channel: msgSuccess msgInfo msgError
{
"type": "ws",
"channel": "",
"content": ""
}
config
lyrebird config dict
log_queue
send log msg to logger process
async_obj is a dict
used to pass in all objects used for synchronization/communication between multiple processes
Usually msg_queue, config and log_queue is included
msg_queue:
message queue for process server and main process
#1. Send event to main process,
{
"type": "event",
"channel": "",
"content": {}
}
#2. Send message to frontend
support channel: msgSuccess msgInfo msgError
{
"type": "ws",
"channel": "",
"content": ""
}
config:
lyrebird config dict
log_queue:
send log msg to logger process
'''
pass

def start(self):
if self.running:
return

from lyrebird.log import get_logger
logger = get_logger()

global service_msg_queue
config = application.config.raw()
if service_msg_queue is None:
service_msg_queue = application.sync_manager.get_multiprocessing_queue()
config = application._cm.config
logger_queue = application.server['log'].queue
self.server_process = Process(group=None, target=self.run,
args=[service_msg_queue, config, logger_queue, self.args],
kwargs=self.kwargs,
daemon=True)

# run method has too many arguments. Merge the msg_queue, log_queue and so on into async_obj
# This code is used for compatibility with older versions of the run method in the plugin
# This code should be removed after all upgrades have been confirmed
if check_process_server_run_function_compatibility(self.run):
self.async_obj['logger_queue'] = logger_queue
self.async_obj['msg_queue'] = service_msg_queue
self.server_process = Process(group=None, target=self.run,
args=[self.async_obj, config, self.args],
kwargs=self.kwargs,
daemon=True)
else:
logger.warning(f'The run method in {type(self).__name__} is an old parameter format that will be removed in the future')
self.server_process = Process(group=None, target=self.run,
args=[service_msg_queue, config, logger_queue, self.args],
kwargs=self.kwargs,
daemon=True)
self.server_process.start()
self.running = True

def stop(self):
self.running = False

def terminate(self):
if self.server_process:
self.server_process.terminate()
self.server_process.join()
self.server_process = None


Expand All @@ -84,6 +119,9 @@ def start(self, *args, **kwargs):
def stop(self):
self.running = False
# TODO terminate self.server_thread

def terminate(self):
pass

def run(self):
"""
Expand All @@ -100,16 +138,23 @@ def start(self, *args, **kwargs):
def stop(self):
pass

def terminate(self):
pass


class MultiProcessServerMessageDispatcher(ThreadServer):

def run(self):
global service_msg_queue
if service_msg_queue is None:
service_msg_queue = application.sync_manager.get_multiprocessing_queue()
emit = application.server['mock'].socket_io.emit
publish = application.server['event'].publish

while True:
while self.running:
msg = service_msg_queue.get()
if msg is None:
break
type = msg.get('type')
if type == 'event':
channel = msg.get('channel')
Expand Down
Loading

0 comments on commit 3b11287

Please sign in to comment.