Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds support to behaviours to block when the Agent message queue is empty #50

Merged
merged 3 commits into from
Jul 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pade/behaviours/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def done(self):
'''
pass

def wait(self, timeout):
def block(self, timeout):
''' This method sleeps a behaviour until occurs a timeout. The
behaviour will execute normally afterwards.
'''
Expand Down
4 changes: 4 additions & 0 deletions pade/behaviours/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ def done(self):
'''
pass

def block(self):
''' Blocks the behaviour until a new message arrive.
'''
self.agent.message_event.wait()

class OneShotBehaviour(BaseBehaviour):

Expand Down
35 changes: 35 additions & 0 deletions pade/core/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -724,6 +724,9 @@ class Agent(Agent_):
Messages received by the agent
messages_lock : threading.Lock
Lock the message stack during operations on the stack
message_event : threading.Event
The Event object that unblock the blocked behaviours when a message
arrives.
"""

def __init__(self, aid, debug=False, ignore_ams=True, wait_time=300):
Expand Down Expand Up @@ -762,6 +765,8 @@ def __init__(self, aid, debug=False, ignore_ams=True, wait_time=300):
self.messages = deque()
# The lock object to ensure thread-safe queue reading
self.messages_lock = threading.Lock()
# An event handler to be launched when a message arrives
self.message_event = threading.Event()

def update_ams(self, ams):
"""Summary
Expand Down Expand Up @@ -806,6 +811,36 @@ def react(self, message):
with self.messages_lock:
if not self.ignore_ams or not message.system_message:
self.messages.appendleft(message)
self.message_event.set()
# Clears the message event in order to block the behaviours again
self.message_event.clear()


def receive(self, message_filter = None):
'''
Get the first message from the top of the message stack (left-side of
the deque) with match the filters provided.

If any filter was provided, return the message from the top of stack.
Parameters
----------
messageFilters : Filter
filter to be applied at the messages
'''

with self.messages_lock:
# Checks if has message without launch another lock. This ensures
# that the method never will raise an 'empty queue' exception.
if len(self.messages) > 0:
if message_filter == None:
return self.messages.popleft()

for message in self.messages:
if message_filter.filter(message):
self.messages.remove(message)
return message
return None


def receive(self, message_filter = None):
'''
Expand Down