Skip to content

Commit

Permalink
Only use multiprocessing.Queue workaround on OSX, rename it from bugf…
Browse files Browse the repository at this point in the history
…ix_mp for aesthetics
  • Loading branch information
Tom committed Jul 13, 2015
1 parent b4d71b5 commit 2a2e421
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 32 deletions.
6 changes: 3 additions & 3 deletions MAVProxy/modules/lib/mp_image.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,9 @@ def __init__(self,
self.menu = None
self.popup_menu = None

from bugfix_mp import mpQueue
self.in_queue = mpQueue()
self.out_queue = mpQueue()
from multiprocessing_queue import makeIPCQueue
self.in_queue = makeIPCQueue()
self.out_queue = makeIPCQueue()

self.default_menu = MPMenuSubMenu('View',
items=[MPMenuItem('Fit Window', 'Fit Window', 'fitWindow'),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,8 @@
import multiprocessing

class mpCounter(object):
""" Locked/shared counter """

def __init__(self, default=0):
self._value = multiprocessing.RawValue('i', default)
self._lock = multiprocessing.Lock()

def add(self, delta):
with self._lock:
self._value.value += delta

@property
def value(self):
with self._lock:
return self._value.value
import sys

from multiprocessing.queues import Queue
class mpQueue(Queue):
class osxQueue(Queue):
""" Wrapper around mp.Queue that works on OSX...
Queue.qsize() relies on sem_getvalue() to get queue size,
Expand All @@ -26,18 +11,28 @@ class mpQueue(Queue):

def __init__(self, *args, **kwargs):
Queue.__init__(self, *args, **kwargs)
self.qsizeCounter = mpCounter(0)
self._counter = multiprocessing.RawValue('i', 0)
self._lock = multiprocessing.Lock()

def put(self, *args, **kwargs):
# If the put fails, the exception will prevent us from incrementing qsizeCounter
# If the put fails, the exception will prevent us from incrementing the counter
Queue.put(self, *args, **kwargs)
self.qsizeCounter.add(1)
with self._lock:
self._counter.value += 1

def get(self, *args, **kwargs):
# If the get fails, the exception will prevent us from decrementing qsizeCounter
# If the get fails, the exception will prevent us from decrementing the counter
val = Queue.get(self, *args, **kwargs)
self.qsizeCounter.add(-1)
with self._lock:
self._counter.value -= 1
return val

def qsize(self):
return self.qsizeCounter.value
with self._lock:
return self._counter.value

def makeIPCQueue(*args, **kwargs):
if sys.platform == 'darwin':
return osxQueue(*args, **kwargs)
else:
return Queue(*args, **kwargs)
6 changes: 3 additions & 3 deletions MAVProxy/modules/mavproxy_map/mp_slipmap.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ def __init__(self,
self.drag_step = 10

self.title = title
from ..lib.bugfix_mp import mpQueue
self.event_queue = mpQueue()
self.object_queue = mpQueue()
from ..lib.multiprocessing_queue import makeIPCQueue
self.event_queue = makeIPCQueue()
self.object_queue = makeIPCQueue()
self.close_window = multiprocessing.Event()
self.close_window.clear()
self.child = multiprocessing.Process(target=self.child_task)
Expand Down
6 changes: 3 additions & 3 deletions MAVProxy/modules/mavproxy_misseditor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,10 +154,10 @@ def __init__(self, mpstate):
self.num_wps_expected = 0 #helps me to know if all my waypoints I'm expecting have arrived
self.wps_received = {}

from ..lib.bugfix_mp import mpQueue
self.event_queue = mpQueue()
from ..lib.multiprocessing_queue import makeIPCQueue
self.event_queue = makeIPCQueue()
self.event_queue_lock = multiprocessing.Lock()
self.gui_event_queue = mpQueue()
self.gui_event_queue = makeIPCQueue()
self.gui_event_queue_lock = multiprocessing.Lock()

self.event_thread = MissionEditorEventThread(self, self.event_queue, self.event_queue_lock)
Expand Down

0 comments on commit 2a2e421

Please sign in to comment.