From 27ea61b43fc89aae1e3a3ba4dcf16f41dfe651f2 Mon Sep 17 00:00:00 2001 From: Liam Staskawicz Date: Tue, 28 Jul 2020 22:48:06 -0700 Subject: [PATCH 1/4] connections: refactor, clarify selectors use --- cheroot/connections.py | 137 ++++++++++++++++------------------ cheroot/server.py | 25 +++---- cheroot/workers/threadpool.py | 5 -- 3 files changed, 77 insertions(+), 90 deletions(-) diff --git a/cheroot/connections.py b/cheroot/connections.py index 89fd204e56..1d550af23c 100644 --- a/cheroot/connections.py +++ b/cheroot/connections.py @@ -3,6 +3,7 @@ from __future__ import absolute_import, division, print_function __metaclass__ = type +import collections import io import os import socket @@ -61,9 +62,11 @@ def __init__(self, server): that uses this ConnectionManager instance. """ self.server = server - self.connections = [] + self._readable_conns = collections.deque() self._selector = selectors.DefaultSelector() + self._selector.register(server.socket.fileno(), selectors.EVENT_READ, data=server) + def put(self, conn): """Put idle connection into the ConnectionManager to be managed. @@ -72,8 +75,12 @@ def put(self, conn): to be managed. """ conn.last_used = time.time() - conn.ready_with_data = conn.rfile.has_data() - self.connections.append(conn) + # if this conn doesn't have any more data waiting to be read, + # register it with the selector. + if conn.rfile.has_data(): + self._readable_conns.append(conn) + else: + self._selector.register(conn.socket.fileno(), selectors.EVENT_READ, data=conn) def expire(self): """Expire least recently used connections. @@ -83,21 +90,20 @@ def expire(self): This should be called periodically. """ - if not self.connections: - return - # Look at the first connection - if it can be closed, then do - # that, and wait for get_conn to return it. - conn = self.connections[0] - if conn.closeable: - return + # find any connections still registered with the selector + # that have not been active recently enough. + now = time.time() + for _, key in self._selector.get_map().items(): + if key.data == self.server: + continue - # Connection too old? - if (conn.last_used + self.server.timeout) < time.time(): - conn.closeable = True - return + conn = key.data + if (conn.last_used + self.server.timeout) < now: + self._selector.unregister(key.fd) + conn.close() - def get_conn(self, server_socket): + def get_conn(self): """Return a HTTPConnection object which is ready to be handled. A connection returned by this method should be ready for a worker @@ -107,88 +113,60 @@ def get_conn(self, server_socket): Any connection returned by this method will need to be `put` back if it should be examined again for another request. - Args: - server_socket (socket.socket): Socket to listen to for new - connections. Returns: cheroot.server.HTTPConnection instance, or None. """ # Grab file descriptors from sockets, but stop if we find a # connection which is already marked as ready. - socket_dict = {} - for conn in self.connections: - if conn.closeable or conn.ready_with_data: - break - socket_dict[conn.socket.fileno()] = conn - else: - # No ready connection. - conn = None - # We have a connection ready for use. - if conn: - self.connections.remove(conn) - return conn + try: + return self._readable_conns.popleft() + except IndexError: + pass # Will require a select call. - ss_fileno = server_socket.fileno() - socket_dict[ss_fileno] = server_socket try: - for fno in socket_dict: - self._selector.register(fno, selectors.EVENT_READ) # The timeout value impacts performance and should be carefully # chosen. Ref: # github.com/cherrypy/cheroot/issues/305#issuecomment-663985165 rlist = [ - key.fd for key, _event + key for key, _ in self._selector.select(timeout=0.01) ] except OSError: - # Mark any connection which no longer appears valid. - for fno, conn in list(socket_dict.items()): + # Mark any connection which no longer appears valid + for _, key in self._selector.get_map().items(): # If the server socket is invalid, we'll just ignore it and # wait to be shutdown. - if fno == ss_fileno: + if key.data == self.server: continue + try: - os.fstat(fno) + os.fstat(key.fd) except OSError: - # Socket is invalid, close the connection, insert at - # the front. - self.connections.remove(conn) - self.connections.insert(0, conn) - conn.closeable = True + # Socket is invalid, close the connection + self._selector.unregister(key.fd) + conn.close() # Wait for the next tick to occur. return None - finally: - for fno in socket_dict: - self._selector.unregister(fno) - try: - # See if we have a new connection coming in. - rlist.remove(ss_fileno) - except ValueError: - # If we didn't get any readable sockets, wait for the next tick - if not rlist: - return None - - # No new connection, but reuse an existing socket. - conn = socket_dict[rlist.pop()] - else: - # If we have a new connection, reuse the server socket - conn = server_socket - - # All remaining connections in rlist should be marked as ready. - for fno in rlist: - socket_dict[fno].ready_with_data = True + for key in rlist: + if key.data is self.server: + # New connection + return self._from_server_socket(self.server.socket) - # New connection. - if conn is server_socket: - return self._from_server_socket(server_socket) + conn = key.data + # unregister connection from the selector until the server + # has read from it and returned it via put() + self._selector.unregister(key.fd) + self._readable_conns.append(conn) - self.connections.remove(conn) - return conn + try: + return self._readable_conns.popleft() + except IndexError: + return None def _from_server_socket(self, server_socket): try: @@ -282,12 +260,27 @@ def _from_server_socket(self, server_socket): def close(self): """Close all monitored connections.""" - for conn in self.connections[:]: + for conn in self._readable_conns: conn.close() - self.connections = [] + self._readable_conns.clear() + + for _, key in self._selector.get_map().items(): + if key.data != self.server: # server closes its own socket + key.data.socket.close() + + self._selector.close() + + def _num_connections(self): + """Return the current number of connections. + + Includes any in the readable list or registered with the selector, + minus one for the server socket, which is always registered + with the selector. + """ + return len(self._readable_conns) + len(self._selector.get_map()) - 1 @property def can_add_keepalive_connection(self): """Flag whether it is allowed to add a new keep-alive connection.""" ka_limit = self.server.keep_alive_conn_limit - return ka_limit is None or len(self.connections) < ka_limit + return ka_limit is None or self._num_connections() < ka_limit diff --git a/cheroot/server.py b/cheroot/server.py index 70623cdfe4..5ec3002a92 100644 --- a/cheroot/server.py +++ b/cheroot/server.py @@ -1225,9 +1225,7 @@ class HTTPConnection: peercreds_resolve_enabled = False # Fields set by ConnectionManager. - closeable = False last_used = None - ready_with_data = False def __init__(self, server, sock, makefile=MakeFile): """Initialize HTTPConnection instance. @@ -1581,7 +1579,7 @@ def __init__( self.requests = threadpool.ThreadPool( self, min=minthreads or 1, max=maxthreads, ) - self.connections = connections.ConnectionManager(self) + self.serving = False if not server_name: server_name = self.version @@ -1775,6 +1773,8 @@ def prepare(self): self.socket.settimeout(1) self.socket.listen(self.request_queue_size) + self.connections = connections.ConnectionManager(self) + # Create worker threads self.requests.start() @@ -1783,6 +1783,7 @@ def prepare(self): def serve(self): """Serve requests, after invoking :func:`prepare()`.""" + self.serving = True while self.ready: try: self.tick() @@ -1794,12 +1795,7 @@ def serve(self): traceback=True, ) - if self.interrupt: - while self.interrupt is True: - # Wait for self.stop() to complete. See _set_interrupt. - time.sleep(0.1) - if self.interrupt: - raise self.interrupt + self.serving = False def start(self): """Run the server forever. @@ -2017,10 +2013,7 @@ def resolve_real_bind_addr(socket_): def tick(self): """Accept a new connection and put it on the Queue.""" - if not self.ready: - return - - conn = self.connections.get_conn(self.socket) + conn = self.connections.get_conn() if conn: try: self.requests.put(conn) @@ -2041,6 +2034,8 @@ def interrupt(self, interrupt): self._interrupt = True self.stop() self._interrupt = interrupt + if self._interrupt: + raise self.interrupt def stop(self): """Gracefully shutdown a server that is serving forever.""" @@ -2049,6 +2044,10 @@ def stop(self): self._run_time += (time.time() - self._start_time) self._start_time = None + # ensure serve is no longer accessing socket, connections + while self.serving: + time.sleep(0.1) + sock = getattr(self, 'socket', None) if sock: if not isinstance( diff --git a/cheroot/workers/threadpool.py b/cheroot/workers/threadpool.py index 4466e7a143..b9987d9de3 100644 --- a/cheroot/workers/threadpool.py +++ b/cheroot/workers/threadpool.py @@ -111,11 +111,6 @@ def run(self): if conn is _SHUTDOWNREQUEST: return - # Just close the connection and move on. - if conn.closeable: - conn.close() - continue - self.conn = conn is_stats_enabled = self.server.stats['Enabled'] if is_stats_enabled: From d1fdf9f108abda9be8860690ca4ab3b7b40a5aa2 Mon Sep 17 00:00:00 2001 From: Liam Staskawicz Date: Tue, 28 Jul 2020 23:25:04 -0700 Subject: [PATCH 2/4] connections: lint --- cheroot/connections.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/cheroot/connections.py b/cheroot/connections.py index 1d550af23c..ba87aaccdc 100644 --- a/cheroot/connections.py +++ b/cheroot/connections.py @@ -65,7 +65,10 @@ def __init__(self, server): self._readable_conns = collections.deque() self._selector = selectors.DefaultSelector() - self._selector.register(server.socket.fileno(), selectors.EVENT_READ, data=server) + self._selector.register( + server.socket.fileno(), + selectors.EVENT_READ, data=server, + ) def put(self, conn): """Put idle connection into the ConnectionManager to be managed. @@ -80,7 +83,9 @@ def put(self, conn): if conn.rfile.has_data(): self._readable_conns.append(conn) else: - self._selector.register(conn.socket.fileno(), selectors.EVENT_READ, data=conn) + self._selector.register( + conn.socket.fileno(), selectors.EVENT_READ, data=conn, + ) def expire(self): """Expire least recently used connections. @@ -147,6 +152,7 @@ def get_conn(self): except OSError: # Socket is invalid, close the connection self._selector.unregister(key.fd) + conn = key.data conn.close() # Wait for the next tick to occur. @@ -265,7 +271,7 @@ def close(self): self._readable_conns.clear() for _, key in self._selector.get_map().items(): - if key.data != self.server: # server closes its own socket + if key.data != self.server: # server closes its own socket key.data.socket.close() self._selector.close() From 3135db560ffa102fd75620bbda9ddef4b22a3c72 Mon Sep 17 00:00:00 2001 From: Liam Staskawicz Date: Tue, 11 Aug 2020 21:22:01 -0700 Subject: [PATCH 3/4] connections: convert _num_connections to property, clarify expire() --- cheroot/connections.py | 32 +++++++++++++++----------------- 1 file changed, 15 insertions(+), 17 deletions(-) diff --git a/cheroot/connections.py b/cheroot/connections.py index ba87aaccdc..2389297da5 100644 --- a/cheroot/connections.py +++ b/cheroot/connections.py @@ -11,6 +11,7 @@ from . import errors from ._compat import selectors +from ._compat import suppress from .makefile import MakeFile import six @@ -98,15 +99,15 @@ def expire(self): # find any connections still registered with the selector # that have not been active recently enough. - now = time.time() - for _, key in self._selector.get_map().items(): - if key.data == self.server: - continue - - conn = key.data - if (conn.last_used + self.server.timeout) < now: - self._selector.unregister(key.fd) - conn.close() + threshold = time.time() - self.server.timeout + timed_out_connections = ( + (sock_fd, conn) + for _, (_, sock_fd, _, conn) in self._selector.get_map().items() + if conn != self.server and conn.last_used < threshold + ) + for sock_fd, conn in timed_out_connections: + self._selector.unregister(sock_fd) + conn.close() def get_conn(self): """Return a HTTPConnection object which is ready to be handled. @@ -122,13 +123,9 @@ def get_conn(self): cheroot.server.HTTPConnection instance, or None. """ - # Grab file descriptors from sockets, but stop if we find a - # connection which is already marked as ready. - - try: + # return a readable connection if any exist + with suppress(IndexError): return self._readable_conns.popleft() - except IndexError: - pass # Will require a select call. try: @@ -276,8 +273,9 @@ def close(self): self._selector.close() + @property def _num_connections(self): - """Return the current number of connections. + """The current number of connections. Includes any in the readable list or registered with the selector, minus one for the server socket, which is always registered @@ -289,4 +287,4 @@ def _num_connections(self): def can_add_keepalive_connection(self): """Flag whether it is allowed to add a new keep-alive connection.""" ka_limit = self.server.keep_alive_conn_limit - return ka_limit is None or self._num_connections() < ka_limit + return ka_limit is None or self._num_connections < ka_limit From 213387603287ed2947fb392d070d212a80cfc02a Mon Sep 17 00:00:00 2001 From: Sviatoslav Sydorenko Date: Wed, 12 Aug 2020 21:46:23 +0200 Subject: [PATCH 4/4] Address pydocstyle offences in cheroot.connections --- cheroot/connections.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/cheroot/connections.py b/cheroot/connections.py index 2389297da5..b416077c8c 100644 --- a/cheroot/connections.py +++ b/cheroot/connections.py @@ -96,7 +96,6 @@ def expire(self): This should be called periodically. """ - # find any connections still registered with the selector # that have not been active recently enough. threshold = time.time() - self.server.timeout @@ -274,7 +273,7 @@ def close(self): self._selector.close() @property - def _num_connections(self): + def _num_connections(self): # noqa: D401 """The current number of connections. Includes any in the readable list or registered with the selector,