Skip to content

Commit

Permalink
fix unittest bugs (#136)
Browse files Browse the repository at this point in the history
* add favicon

* Sleep longer for unittest.

* fix unittest bugs

* increase sleep seconds after job_memory error
  • Loading branch information
fuyw authored and TomorrowIsAnOtherDay committed Aug 22, 2019
1 parent eca90f1 commit 46f5990
Show file tree
Hide file tree
Showing 9 changed files with 44 additions and 21 deletions.
1 change: 1 addition & 0 deletions MANIFEST.in
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
include parl/remote/static/logo.png
include parl/remote/static/favicon.ico
recursive-include parl/remote/templates *.html
recursive-include parl/remote/static/css *.css
recursive-include parl/remote/static/js *.js
13 changes: 10 additions & 3 deletions parl/remote/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def __init__(self, worker_address):
Attributes:
pid (int): Job process ID.
max_memory (float): Maximum memory (MB) can be used by each remote instance.
max_memory (float): Maximum memory (MB) can be used by each remote instance.
"""
self.job_is_alive = True
self.worker_address = worker_address
Expand All @@ -59,10 +59,13 @@ def __init__(self, worker_address):
self.lock = threading.Lock()
self._create_sockets()

process = psutil.Process(self.pid)
self.init_memory = float(process.memory_info()[0]) / (1024**2)

def _create_sockets(self):
"""Create three sockets for each job.
(1) reply_socket(main socket): receives the command(i.e, the function name and args)
(1) reply_socket(main socket): receives the command(i.e, the function name and args)
from the actual class instance, completes the computation, and returns the result of
the function.
(2) job_socket(functional socket): sends job_address and heartbeat_address to worker.
Expand Down Expand Up @@ -134,7 +137,7 @@ def _check_used_memory(self):
if self.max_memory is not None:
process = psutil.Process(self.pid)
used_memory = float(process.memory_info()[0]) / (1024**2)
if used_memory > self.max_memory:
if used_memory > self.max_memory + self.init_memory:
stop_job = True
return stop_job

Expand Down Expand Up @@ -174,6 +177,10 @@ def _reply_client_heartbeat(self, socket):
to_byte(self.job_address)
])
if stop_job == True:
logger.error(
"Memory used by this job exceeds {}. This job will exist."
.format(self.max_memory))
time.sleep(3)
socket.close(0)
os._exit(1)
except zmq.error.Again as e:
Expand Down
5 changes: 2 additions & 3 deletions parl/remote/scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ def start_master(port, cpu_num, monitor_port):
## If you want to check cluster status, please view:
http://{}:{}.
http://{}:{}
or call:
Expand Down Expand Up @@ -196,8 +196,7 @@ def status():
status = []
for monitor in monitors:
monitor_port, _, master_address = monitor.split(' ')
master_ip = master_address.split(':')[0]
monitor_address = "{}:{}".format(master_ip, monitor_port)
monitor_address = "{}:{}".format(get_ip_address(), monitor_port)
socket = ctx.socket(zmq.REQ)
socket.connect('tcp://{}'.format(master_address))
socket.send_multipart([STATUS_TAG])
Expand Down
Binary file added parl/remote/static/favicon.ico
Binary file not shown.
3 changes: 2 additions & 1 deletion parl/remote/templates/clients.html
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
<head>
<meta charset="utf-8" />
<title>Parl Cluster</title>
<link rel="shortcut icon" href="../static/favicon.ico">
<script type="text/javascript" src="../static/js/jquery.min.js"></script>
<script src="../static/js/echarts.min.js"></script>
<script src="../static/js/parl.js"></script>
Expand Down Expand Up @@ -39,7 +40,7 @@ <h5 class="font-weight-light text-center text-lg-left mt-4 mb-4">
<tr>
<th scope="col">#</th>
<th scope="col">Path</th>
<th scope="col">Client ID</th>
<th scope="col">Hostname</th>
<th scope="col">Actor Num</th>
<th scope="col">Time (min)</th>
</tr>
Expand Down
2 changes: 2 additions & 0 deletions parl/remote/templates/workers.html
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
<head>
<meta charset="utf-8" />
<title>Parl Cluster</title>
<link rel="shortcut icon" href="../static/favicon.ico">

<script type="text/javascript" src="../static/js/jquery.min.js"></script>
<script src="../static/js/echarts.min.js"></script>
<script src="../static/js/parl.js"></script>
Expand Down
33 changes: 23 additions & 10 deletions parl/remote/tests/actor_max_memory_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,41 +23,54 @@
from parl.remote.client import disconnect
from parl.remote.monitor import ClusterMonitor

from multiprocessing import Process

@parl.remote_class(max_memory=200)

@parl.remote_class(max_memory=300)
class Actor(object):
def __init__(self, x=10):
self.x = x
self.data = []

def add_100mb(self):
self.data.append(os.urandom(100 * 1024**2))
def add_500mb(self):
self.data.append(os.urandom(500 * 1024**2))
self.x += 1
return self.x


from parl.utils import logger


class TestMaxMemory(unittest.TestCase):
def tearDown(self):
disconnect()

def actor(self):
actor1 = Actor()
time.sleep(10)
actor1.add_500mb()

def test_max_memory(self):
port = 3001
master = Master(port=port)
th = threading.Thread(target=master.run)
th.start()
time.sleep(1)
time.sleep(5)
worker = Worker('localhost:{}'.format(port), 1)
cluster_monitor = ClusterMonitor('localhost:{}'.format(port))
time.sleep(1)
time.sleep(5)
parl.connect('localhost:{}'.format(port))
actor = Actor()
time.sleep(30)
time.sleep(20)
self.assertEqual(1, cluster_monitor.data['clients'][0]['actor_num'])
actor.add_100mb()
time.sleep(50)
self.assertEqual(0, cluster_monitor.data['clients'][0]['actor_num'])
actor.job_socket.close(0)
del actor
time.sleep(5)
p = Process(target=self.actor)
p.start()
time.sleep(30)
self.assertEqual(0, cluster_monitor.data['clients'][0]['actor_num'])
p.terminate()

worker.exit()
master.exit()

Expand Down
6 changes: 3 additions & 3 deletions parl/remote/tests/cluster_status_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from parl.remote.monitor import ClusterMonitor


@parl.remote_class(max_memory=200)
@parl.remote_class(max_memory=300)
class Actor(object):
def __init__(self, x=10):
self.x = x
Expand All @@ -45,14 +45,14 @@ def test_cluster_status(self):
master = Master(port=port)
th = threading.Thread(target=master.run)
th.start()
time.sleep(1)
time.sleep(5)
worker = Worker('localhost:{}'.format(port), 1)
time.sleep(5)
status_info = master.cluster_monitor.get_status_info()
self.assertEqual(status_info, 'has 0 used cpus, 1 vacant cpus.')
parl.connect('localhost:{}'.format(port))
actor = Actor()
time.sleep(30)
time.sleep(50)
status_info = master.cluster_monitor.get_status_info()
self.assertEqual(status_info, 'has 1 used cpus, 0 vacant cpus.')
worker.exit()
Expand Down
2 changes: 1 addition & 1 deletion parl/remote/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ def _get_worker_status(self):
total_memory = round(virtual_memory[0] / (1024**3), 2)
used_memory = round(virtual_memory[3] / (1024**3), 2)
vacant_memory = round(total_memory - used_memory, 2)
load_average = round(psutil.getloadavg()[0], 2)
load_average = round(os.getloadavg()[0], 2)
return (vacant_memory, used_memory, now, load_average)

def _reply_heartbeat(self, target):
Expand Down

0 comments on commit 46f5990

Please sign in to comment.