From 12a60cd4f464434f4e1889d1963b16382c0c438b Mon Sep 17 00:00:00 2001 From: Connor Makowski Date: Wed, 6 Nov 2024 16:21:28 -0500 Subject: [PATCH 01/25] Get functional Sockets and Task Management. --- cave_app/asgi.py | 15 +- cave_app/settings/development.py | 9 + cave_core/management/commands/cache_test.py | 13 +- cave_core/management/commands/clearcache.py | 2 +- cave_core/models.py | 20 +- cave_core/utils/__init__.py | 1 - cave_core/utils/wrapping.py | 4 +- cave_core/websockets/__init__.py | 2 +- cave_core/websockets/api_endpoints.py | 7 +- cave_core/websockets/app.py | 12 +- .../cave_ws_broadcaster.py} | 25 +- cave_core/websockets/consumer.py | 96 ------- .../websockets/django_sockets/__init__.py | 0 .../websockets/django_sockets/broadcaster.py | 94 ++++++ .../websockets/django_sockets/middleware.py | 39 +++ cave_core/websockets/django_sockets/pubsub.py | 268 ++++++++++++++++++ .../websockets/django_sockets/sockets.py | 130 +++++++++ cave_core/websockets/django_sockets/utils.py | 167 +++++++++++ cave_core/websockets/middleware.py | 28 -- cave_core/websockets/socket_server.py | 24 ++ cave_core/websockets/urls.py | 6 - 21 files changed, 783 insertions(+), 179 deletions(-) rename cave_core/{utils/broadcasting.py => websockets/cave_ws_broadcaster.py} (87%) delete mode 100644 cave_core/websockets/consumer.py create mode 100644 cave_core/websockets/django_sockets/__init__.py create mode 100644 cave_core/websockets/django_sockets/broadcaster.py create mode 100644 cave_core/websockets/django_sockets/middleware.py create mode 100644 cave_core/websockets/django_sockets/pubsub.py create mode 100644 cave_core/websockets/django_sockets/sockets.py create mode 100644 cave_core/websockets/django_sockets/utils.py delete mode 100644 cave_core/websockets/middleware.py create mode 100644 cave_core/websockets/socket_server.py delete mode 100644 cave_core/websockets/urls.py diff --git a/cave_app/asgi.py b/cave_app/asgi.py index d8a1d1b..6d0a740 100755 --- a/cave_app/asgi.py +++ b/cave_app/asgi.py @@ -6,24 +6,23 @@ import os +# This is needed to set the default settings module prior to importing the get_asgi_application if os.environ.get("DJANGO_SETTINGS_MODULE") is None: print("No DJANGO_SETTINGS_MODULE specified. Defaulting to `cave_app.settings.development`") os.environ.setdefault("DJANGO_SETTINGS_MODULE", "cave_app.settings.development") from django.core.asgi import get_asgi_application -from channels.routing import ProtocolTypeRouter +from cave_core.websockets.django_sockets.utils import ProtocolTypeRouter +from cave_core.websockets.app import get_ws_asgi_application -django_asgi_app = get_asgi_application() - -from cave_core.websockets import get_ws_asgi_application - -# Initialize asgi app items before calling any local imports -# get_asgi_application should always be called before local import here +# Initialize asgi app items when the app starts +# This needs to happen here and not in the protocol router +asgi_app = get_asgi_application() ws_asgi_app = get_ws_asgi_application() application = ProtocolTypeRouter( { - "http": django_asgi_app, + "http": asgi_app, "websocket": ws_asgi_app, } ) diff --git a/cave_app/settings/development.py b/cave_app/settings/development.py index 8b1834f..f75b564 100755 --- a/cave_app/settings/development.py +++ b/cave_app/settings/development.py @@ -172,6 +172,15 @@ } ################################################################ +# DJANGO_SOCKETS_CONFIG +################################################################ +DJANGO_SOCKETS_CONFIG = { + "hosts": [ + {"address": f"redis://{os.environ.get('REDIS_HOST')}:{os.environ.get('REDIS_PORT')}"} + ], +} +################################################################ + # Caching ################################################################ diff --git a/cave_core/management/commands/cache_test.py b/cave_core/management/commands/cache_test.py index 0a14a55..a9b762b 100644 --- a/cave_core/management/commands/cache_test.py +++ b/cave_core/management/commands/cache_test.py @@ -1,9 +1,13 @@ from django.core.management.base import BaseCommand from cave_core.utils.cache import Cache -from channels.layers import get_channel_layer +from cave_core.websockets.django_sockets.sockets import BaseSocketServer from asgiref.sync import async_to_sync +class Socket_Server(BaseSocketServer): + def receive(self, data): + print('Receiving:', data) + class Command(BaseCommand): help = 'Clearing the Cache' @@ -12,7 +16,6 @@ def handle(self, *args, **options): cache.set("test", "test value") print(cache.get("test")) - channel_layer = get_channel_layer() - async_to_sync(channel_layer.send)("test_channel", {"type": "test.message", "text": "Hello Redis!"}) - message = async_to_sync(channel_layer.receive)("test_channel") - print("Message received from Redis:", message) + socket_server = Socket_Server.as_asgi()(None, None, None) + socket_server.subscribe("test_channel") + socket_server.broadcast("test_channel", "test message") diff --git a/cave_core/management/commands/clearcache.py b/cave_core/management/commands/clearcache.py index d7a4166..996d284 100644 --- a/cave_core/management/commands/clearcache.py +++ b/cave_core/management/commands/clearcache.py @@ -9,6 +9,6 @@ def handle(self, *args, **options): self.stdout.write("Clearing the Cache (memory and persistent)...") try: cache = Cache() - cache.clear() + cache.delete_pattern("*") except Exception as e: raise CommandError(f"Failed to clear the cache with the following error: {e}") \ No newline at end of file diff --git a/cave_core/models.py b/cave_core/models.py index 0f847a6..6318f17 100755 --- a/cave_core/models.py +++ b/cave_core/models.py @@ -12,7 +12,7 @@ import type_enforced # Internal Imports -from cave_core.utils.broadcasting import Socket +from cave_core.websockets.cave_ws_broadcaster import CaveWSBroadcaster from cave_core.utils.cache import Cache from cave_core.utils.constants import api_keys, background_api_keys from cave_core.utils.validators import limit_upload_size @@ -230,7 +230,7 @@ def get_user_ids(self): """ Used to get the current user id in a list by itself. - Required by Socket for generic functionaility. + Required by CaveWSBroadcaster for generic functionaility. """ return [self.id] @@ -259,11 +259,11 @@ def broadcast_current_session_info(self): """ Let the user know their current session info (id and loading status) """ - Socket(self).broadcast( + CaveWSBroadcaster(self).broadcast( event="updateSessions", data={"data_path": ["session_id"], "data": self.session.id}, ) - Socket(self).broadcast( + CaveWSBroadcaster(self).broadcast( event="updateLoading", data={ "data_path": ["session_loading"], @@ -839,7 +839,7 @@ def get_sessions(self): def update_sessions_list(self): sessions = self.get_sessions() self.set_session_count(len(sessions)) - Socket(self).broadcast( + CaveWSBroadcaster(self).broadcast( event="updateSessions", data={ "data_path": ["data", str(self.id)], @@ -931,7 +931,7 @@ def broadcast_loading(self, loading: bool) -> None: - Type: bool - What: The loading status to broadcast """ - Socket(self).broadcast( + CaveWSBroadcaster(self).broadcast( event="updateLoading", data={ "data_path": ["session_loading"], @@ -1066,7 +1066,7 @@ def get_data(self, keys:list[str]=None, client_only:bool=True) -> dict: # If the new data is not the same length as the keys to get from the cache, there was an error # Likely, some data was lost from the persistent cache if len(new_data.keys()) != len(keys_to_get_from_cache): - Socket(self).notify( + CaveWSBroadcaster(self).notify( title="Error:", message="Oops! There was an error with the data from this session. It will be reset to initial values to fix the issue.", theme="error", @@ -1105,11 +1105,13 @@ def broadcast_changed_data(self, previous_versions: dict) -> None: ] data = self.get_data(client_only=True, keys=updated_keys) # Broadcast the updated versions and data - Socket(self).broadcast( + self.broadcast_loading(True) + CaveWSBroadcaster(self).broadcast( event="overwrite", versions=versions, data=data, ) + self.broadcast_loading(False) # print('==BROADCAST CHANGED DATA END==') def replace_data(self, data, wipeExisting): @@ -1190,7 +1192,7 @@ def execute_api_command( # print('\n==EXECUTE API COMMAND==') self.set_loading(True) session_data = self.get_data(keys=command_keys, client_only=False) - socket = Socket(self) + socket = CaveWSBroadcaster(self) command_output = execute_command( session_data=session_data, command=command, socket=socket, mutate_dict=mutate_dict ) diff --git a/cave_core/utils/__init__.py b/cave_core/utils/__init__.py index 333e54c..1b07522 100755 --- a/cave_core/utils/__init__.py +++ b/cave_core/utils/__init__.py @@ -1,5 +1,4 @@ from cave_core.utils import ( - broadcasting, constants, emailing, timing, diff --git a/cave_core/utils/wrapping.py b/cave_core/utils/wrapping.py index 1ddc24c..282e006 100755 --- a/cave_core/utils/wrapping.py +++ b/cave_core/utils/wrapping.py @@ -9,7 +9,7 @@ import traceback # Internal Imports -from cave_core.utils.broadcasting import Socket +from cave_core.websockets.cave_ws_broadcaster import CaveWSBroadcaster def format_exception(e): @@ -95,7 +95,7 @@ def wrap(request): if settings.DEBUG: print(traceback_str) # Notify the user of the exception - Socket(session).notify( + CaveWSBroadcaster(session).notify( message=str(e), title="Error:", show=True, diff --git a/cave_core/websockets/__init__.py b/cave_core/websockets/__init__.py index 70d81bc..8b13789 100644 --- a/cave_core/websockets/__init__.py +++ b/cave_core/websockets/__init__.py @@ -1 +1 @@ -from .app import get_ws_asgi_application + diff --git a/cave_core/websockets/api_endpoints.py b/cave_core/websockets/api_endpoints.py index 1979ed1..616142a 100644 --- a/cave_core/websockets/api_endpoints.py +++ b/cave_core/websockets/api_endpoints.py @@ -1,6 +1,5 @@ # Internal Imports -from cave_core import models -from cave_core.utils.broadcasting import Socket +from cave_core.websockets.cave_ws_broadcaster import CaveWSBroadcaster from cave_core.utils.wrapping import cache_data_version, ws_api_app @@ -135,7 +134,7 @@ def mutate_session(request): # In the case of a synch_error broadcast a version fix to the user # and break from any more session work if response.get("synch_error"): - Socket(request.user).notify( + CaveWSBroadcaster(request.user).notify( message="Oops! You are out of sync. Fix in progress...", title="Warning:", show=True, @@ -156,7 +155,7 @@ def mutate_session(request): ) # If no api command is provided, apply the mutation else: - Socket(session_i).broadcast( + CaveWSBroadcaster(session_i).broadcast( event="mutation", versions=session_i.get_versions(), data=mutate_dict, diff --git a/cave_core/websockets/app.py b/cave_core/websockets/app.py index 956b7ee..e63cc15 100644 --- a/cave_core/websockets/app.py +++ b/cave_core/websockets/app.py @@ -1,7 +1,11 @@ -from channels.routing import URLRouter -from .middleware import TokenAuthMiddleware -from .urls import websocket_urlpatterns +from .django_sockets.utils import URLRouter +from .django_sockets.middleware import DRFTokenAuthMiddleware +from django.urls import path +from .socket_server import SocketServer +websocket_urlpatterns = [ + path("ws/", SocketServer.as_asgi()), +] def get_ws_asgi_application(): - return TokenAuthMiddleware(URLRouter(websocket_urlpatterns)) + return DRFTokenAuthMiddleware(URLRouter(websocket_urlpatterns)) diff --git a/cave_core/utils/broadcasting.py b/cave_core/websockets/cave_ws_broadcaster.py similarity index 87% rename from cave_core/utils/broadcasting.py rename to cave_core/websockets/cave_ws_broadcaster.py index f54d3d2..10d3a6b 100755 --- a/cave_core/utils/broadcasting.py +++ b/cave_core/websockets/cave_ws_broadcaster.py @@ -1,13 +1,11 @@ -# Framework Imports -from channels.layers import get_channel_layer - # External Imports -from asgiref.sync import async_to_sync -import json, type_enforced +import type_enforced + +from .django_sockets.broadcaster import Broadcaster -channel_layer = get_channel_layer() -sync_send = async_to_sync(channel_layer.group_send) +broadcaster = Broadcaster() +# Constants acceptable_events = set( [ "mutation", @@ -18,11 +16,13 @@ "export", ] ) - theme_list = set(["primary", "secondary", "error", "warning", "info", "success"]) +# Loading Events +loading_true = {"event": "updateLoading", "data": {"data_path": ["data_loading"], "data": True}} +loading_false = {"event": "updateLoading", "data": {"data_path": ["data_loading"], "data": False}} -class Socket: +class CaveWSBroadcaster: def __init__(self, model_object): self.model_object = model_object @@ -51,7 +51,7 @@ def format_broadcast_payload(self, event: str, data: dict, **kwargs): ) if not isinstance(data, dict): raise TypeError(f"Invalid `data` type ('{type(data)}'). `data` must be a dict.") - return json.dumps({"event": event, "data": data, **kwargs}) + return {"event": event, "data": data, **kwargs} def broadcast(self, event: str, data: dict, **kwargs): """ @@ -68,11 +68,8 @@ def broadcast(self, event: str, data: dict, **kwargs): - Type: dict - What: The data to broadcast """ - payload = self.format_broadcast_payload(event=event, data=data, **kwargs) - # Note: broadcast_type refers to the function called in consumer.py - broadcast_type = "loadingbroadcast" if event == "overwrite" else "broadcast" for user_id in self.model_object.get_user_ids(): - sync_send(str(user_id), {"type": broadcast_type, "payload": payload}) + broadcaster.broadcast(str(user_id), self.format_broadcast_payload(event=event, data=data, **kwargs)) @type_enforced.Enforcer def notify( diff --git a/cave_core/websockets/consumer.py b/cave_core/websockets/consumer.py deleted file mode 100644 index d25dcda..0000000 --- a/cave_core/websockets/consumer.py +++ /dev/null @@ -1,96 +0,0 @@ -from channels.generic.websocket import WebsocketConsumer as WSConsumer -from django.conf import settings -import json, threading -from asgiref.sync import async_to_sync - -from .commands import get_command - - -def run_in_background(command, *args, **kwargs): - local_thread = threading.Thread(target=command, args=args, kwargs=kwargs) - local_thread.setDaemon(True) - local_thread.start() - - -loading_true = json.dumps( - {"event": "updateLoading", "data": {"data_path": ["data_loading"], "data": True}} -) -loading_false = json.dumps( - {"event": "updateLoading", "data": {"data_path": ["data_loading"], "data": False}} -) - - -class Request: - """ - A simple request object class to mimic the behavior of the request object passed by DRF - """ - - def __init__(self, user, data): - self.data = data - self.user = user - - -class WebsocketConsumer(WSConsumer): - def connect(self): - """ - On WS connection, subscribe to a channel determined by the requesting user's id - """ - self.user_id = str(self.scope["user"].id) - async_to_sync(self.channel_layer.group_add)(self.user_id, self.channel_name) - self.accept() - - def disconnect(self, close_code): - """ - On WS disconnection, remove subscription to the channel determined by the user's id - """ - async_to_sync(self.channel_layer.group_discard)(self.user_id, self.channel_name) - - # Broadcast loading wrapped app messages to clients - def loadingbroadcast(self, event): - """ - Broadcast app messages to clients and add syncronous data loading messages - - Note: Broadcast `event`s should always include a `payload` object that is JSON serialized - """ - # Not async_to_sync since this is already running in an async context - # when called by Socket - self.send(loading_true) - self.send(event.get("payload", "{}")) - self.send(loading_false) - - # Broadcast app messages to clients - def broadcast(self, event): - """ - Broadcast app messages to clients - - Note: Broadcast `event`s should always include a `payload` object that is JSON serialized - """ - # Not async_to_sync since this is already running in an async context - # when called by Socket - self.send(event.get("payload", "{}")) - - # Receive app messages from clients - def receive(self, text_data): - """ - Clients can send requests to the server using the following template: - - ``` - {'command':'command_name_here', 'data':{...}} - ``` - - For example, to get all session data for a websocket (in JS): - - ``` - websocket.send({'command':'get_session_data', 'data':{}}) - ``` - - See the docs for each command to determine the spec that `data` should match. - - In the example above, you can look at the docs for `get_session_data` in the file `cave_core/websockets/api_endpoints.py` - """ - parsed_data = json.loads(text_data) - if settings.DEBUG: - print("WS RECEIVE ", parsed_data.get("command")) - request = Request(self.scope.get("user"), parsed_data.get("data")) - command = get_command(parsed_data.get("command")) - run_in_background(command, request) diff --git a/cave_core/websockets/django_sockets/__init__.py b/cave_core/websockets/django_sockets/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/cave_core/websockets/django_sockets/broadcaster.py b/cave_core/websockets/django_sockets/broadcaster.py new file mode 100644 index 0000000..3b66e40 --- /dev/null +++ b/cave_core/websockets/django_sockets/broadcaster.py @@ -0,0 +1,94 @@ +import asyncio, logging + +from django.conf import settings + +from .pubsub import PubSubLayer +from .utils import ensure_loop_running + +logger = logging.getLogger(__name__) + +class Broadcaster: + def __init__(self, *args, loop=None, **kwargs): + self.__loop__ = ensure_loop_running(loop) + self.pubsub_layer = self.__get_pubsub_layer__() + self.__usable__ = self.pubsub_layer is not None + + def __get_pubsub_layer__(self): + """ + A method to get the PubSubLayer given the settings.DJANGO_SOCKETS_CONFIG + """ + if hasattr(settings, "DJANGO_SOCKETS_CONFIG"): + if isinstance(settings.DJANGO_SOCKETS_CONFIG, dict): + if "hosts" in settings.DJANGO_SOCKETS_CONFIG: + return PubSubLayer(**settings.DJANGO_SOCKETS_CONFIG) + return None + + def __warn_on_not_usable__(self): + """ + Warn the user if the broadcaster is not usable + """ + if not self.__usable__: + logger.log(logging.ERROR, "No hosts provided in settings.DJANGO_SOCKETS_CONFIG. Broadcasting / Subscribing is not possible.") + + # Sync Functions + def broadcast(self, channel:str, data:[dict|list]): + """ + Broadcast data to a specific channel or all channels that this socket server is subscribed to. + + Requires: + + - channel: str = The channel to broadcast the data to + - data: [dict|list] = The data to broadcast to the channel + - Note: This data must be JSON serializable + """ + asyncio.run_coroutine_threadsafe(self.async_broadcast(channel, data), self.__loop__) + + def subscribe(self, channel:str): + """ + Subscribe to a channel to receive data from broadcasts + + Requires: + + - channel: str = The channel to subscribe to + """ + asyncio.run_coroutine_threadsafe(self.async_subscribe(channel), self.__loop__) + + # Async Functions + async def async_broadcast(self, channel:str, data): + """ + Broadcast data to a channel where all relevant clients will receive the data + and send it to the client + + Requires: + + - channel: str = The channel to broadcast the data to + - data: [dict|list] = The data to broadcast to the channel + - Note: This data must be JSON serializable + """ + self.__warn_on_not_usable__() + await self.pubsub_layer.send(str(channel), data) + + + async def async_subscribe(self, channel:str): + """ + Subscribe to a channel to receive data from broadcasts + + Requires: + + - channel: str = The channel to subscribe to + """ + self.__warn_on_not_usable__() + await self.pubsub_layer.subscribe(str(channel)) + + async def async_receive_broadcast(self): + """ + Receive a broadcast from a channel that this socket server is subscribed to + + Returns: + + - channel: str = The channel that the data was broadcasted to + - data: [dict|list] = The data that was broadcasted to the channel + - Note: This data must be JSON serializable + """ + data = await self.pubsub_layer.receive() + return data['channel'], data['data'] diff --git a/cave_core/websockets/django_sockets/middleware.py b/cave_core/websockets/django_sockets/middleware.py new file mode 100644 index 0000000..c18ca0f --- /dev/null +++ b/cave_core/websockets/django_sockets/middleware.py @@ -0,0 +1,39 @@ +from django.contrib.auth.models import AnonymousUser +from .utils import database_sync_to_async + +def drf_token_obj(): + try: + return Token + except: + from rest_framework.authtoken.models import Token + return Token + + +@database_sync_to_async +def get_drf_user(user_token): + if user_token is not None: + try: + token = drf_token_obj().objects.get(key=user_token) + return token.user + except: + pass + return AnonymousUser() + + +def get_query_arg(query_string, arg, default=None): + try: + return (dict((x.split("=") for x in query_string.decode().split("&")))).get(arg, default) + except: + return None + + +class DRFTokenAuthMiddleware: + def __init__(self, app): + self.app = app + + async def __call__(self, scope, receive, send): + scope = dict(scope) + scope["user"] = await get_drf_user(get_query_arg(scope["query_string"], "user_token")) + return await self.app(scope, receive, send) + + diff --git a/cave_core/websockets/django_sockets/pubsub.py b/cave_core/websockets/django_sockets/pubsub.py new file mode 100644 index 0000000..1813e23 --- /dev/null +++ b/cave_core/websockets/django_sockets/pubsub.py @@ -0,0 +1,268 @@ +import asyncio, logging, binascii, msgpack, logging +from redis.asyncio import Redis, ConnectionPool, sentinel + +logger = logging.getLogger(__name__) + +class PubSubLayer: + """ + A meta PubSub Layer that uses Redis's pub/sub functionality and allows multiple subscriptions to + different channels that may or may not be on different cache shards. + + This layer is designed to be used with asyncio and is not necessarily thread-safe. + """ + def __init__( + self, + hosts=None, + ): + """ + Initialize the PubSub Layer + + Requires: + - hosts: A list of dictionaries + - Note: This uses a redis connection pool or a sentinel connection pool to connect to the Redis server. + - Each dictionary should contain the following key value pairs (depending on the connection pool type): + - ConnectionPool: + - host: The host of the Redis server + - port: The port of the Redis server + - Other keys listed in the redis-py ConnectionPool documentation + - SentinelConnectionPool: + - master_name: The name of the master in a Redis Sentinel setup + - sentinels: A list of sentinel addresses + - sentinel_kwargs: A dictionary of keyword arguments to pass to the sentinel connect + - Other keys listed in the redis-py SentinelConnectionPool documentation + """ + self.queue = asyncio.Queue() + self.subscriptions = dict() + if not isinstance(hosts, list): + raise ValueError("Hosts must be a list of dictionaries") + if len(hosts) == 0: + raise ValueError("Hosts must contain at least one dictionary") + self.shards = [ShardConnection(host, self) for host in hosts] + + # PubSub methods + async def subscribe(self, channel:str): + """ + Subscribe to a channel + """ + shard = self.__get_shard__(channel) + if channel not in self.subscriptions: + self.subscriptions[channel] = shard + await shard.subscribe(channel) + + async def unsubscribe(self, channel:str): + """ + Unsubscribe from a channel + """ + if channel in self.subscriptions: + shard = self.subscriptions.pop(channel) + await shard.unsubscribe(channel) + + async def send(self, channel:str, data): + """ + Send data to a channel + """ + shard = self.__get_shard__(channel) + await shard.publish(channel, data) + + + async def receive(self) -> dict|None: + """ + Get the next item from the queue. This will hang until an item is available. + + If the queue has been closed, cleanup and raise an exception to exit the calling task. + + Format: + { + 'channel': str, + 'data': Any + } + """ + try: + return await self.queue.get() + except (asyncio.CancelledError, asyncio.TimeoutError, GeneratorExit): + # Cleanup / unsubscribe on interruptions / exits / timeouts + await self.flush() + # Raise an exception to exit the calling task + raise + + async def flush(self): + """ + Flush the layer and close all connections. + """ + for shard in set(self.subscriptions.values()): + try: + await shard.flush() + except BaseException: + logger.exception("Exception while flushing shard connection") + self.subscriptions=dict() + + + # Utility Methods + def __get_shard__(self, channel): + """ + Return the shard that is used for this channel. + + This is done by assigning a shard index location based on the CRC32 of the channel name. + """ + if len(self.shards) == 1: + shard_index = 0 + else: + hash_val = binascii.crc32(channel.encode("utf8")) & 0xFFF + shard_index = int(hash_val / (4096 / float(len(self.shards)))) + return self.shards[shard_index] + + +class ShardConnection: + def __init__(self, host, pubsub_layer_obj, prefix="pubsub"): + self.connection_pool = self.__get_connection_pool__(host) + self.pubsub_layer_obj = pubsub_layer_obj + self.lock = asyncio.Lock() + self.connection = None + self.pubsub = None + self.receiver_task = None + self.subscriptions = set() + self.prefix = prefix + + # PubSub methods + async def subscribe(self, channel): + channel = self.__get_channel_name__(channel) + async with self.lock: + if channel in self.subscriptions: + return + await self.__ensure_connection__() + await self.pubsub.subscribe(channel) + self.subscriptions.add(channel) + # Drop out of the lock to start the receiver task which requires the lock to be released + await self.ensure_receiver_task() + + async def unsubscribe(self, channel): + channel = self.__get_channel_name__(channel) + async with self.lock: + if channel not in self.subscriptions: + return + await self.__ensure_connection__() + await self.pubsub.unsubscribe(channel) + self.subscriptions.remove(channel) + if len(self.subscriptions) == 0: + self.flush() + + async def publish(self, channel, message): + channel = self.__get_channel_name__(channel) + async with self.lock: + await self.__ensure_connection__() + await self.connection.publish(channel, self.__serialize__(message)) + + async def ensure_receiver_task(self): + async with self.lock: + if self.receiver_task is None: + self.receiver_task = asyncio.create_task(self.__receiver_task__()) + # This is needed to continue the main coroutine execution after create_task + await asyncio.sleep(0) + + async def flush(self): + # Flushing is not locked since it can be called from inside the lock + if self.receiver_task: + self.receiver_task.cancel() + try: + await self.receiver_task + except asyncio.CancelledError: + pass + self.receiver_task = None + if self.pubsub: + self.pubsub.close() + self.pubsub = None + if self.connection: + self.connection.close() + self.connection = None + + # Tasks + async def __receiver_task__(self): + """ + Start a task to receive messages from the pubsub and put them in the queue + + This task will run until all subscriptions are removed. + + It will loop continuously as awaiting the pubsub.get_message will not hang the event loop. + """ + # print("RECEIVER TASK STARTING") + while len(self.subscriptions) > 0: + try: + # Make sure pubsub is active and subscribed otherwise wait for subscription to be established + if self.pubsub and self.pubsub.subscribed: + # Get messages from the pubsub + message = await self.pubsub.get_message(ignore_subscribe_messages=True) + # If message is not None, put it in the channel queue + if message: + self.pubsub_layer_obj.queue.put_nowait({ + 'channel':self.__get_channel_from_name__(message["channel"].decode()), + 'data':self.__deserialize__(message["data"]) + }) + # Wait for a short time to prevent busy waiting + # This also serves to wait for the pubsub layer to be subscribed to the channel + await asyncio.sleep(0.1) + # Exit on cancellation, timeout, or generator exit (for cleanup afer connection is closed) + except (asyncio.CancelledError,asyncio.TimeoutError,GeneratorExit): + # print("RECEIVER TASK KILLED") + self.flush() + raise + except: + logger.exception("Exception while receiving message from pubsub") + await self.flush() + + # Utility Methods + async def __ensure_connection__(self): + """ + Ensure that the connection to the cache is established. + + Note: This should only be called within a lock. + """ + if not self.connection: + self.connection = Redis(connection_pool=self.connection_pool) + self.pubsub = self.connection.pubsub() + + def __get_channel_name__(self, channel): + """ + Get the channel name with the prefix. + """ + return f"{self.prefix}.{channel}" + + def __get_channel_from_name__(self, channel_name): + """ + Get the channel name without the prefix. + """ + return channel_name[len(self.prefix)+1:] + + def __serialize__(self, message): + """ + Serialize a message into bytes. + """ + return msgpack.packb(message) + + def __deserialize__(self, message): + """ + Deserialize a message from bytes. + """ + return msgpack.unpackb(message, strict_map_key=False) + + @staticmethod + def __get_connection_pool__(host:dict): + """ + Get a connection pool from a host dictionary + """ + host = host.copy() + if "address" in host: + address = host.pop("address") + return ConnectionPool.from_url(address, **host) + + master_name = host.pop("master_name", None) + if master_name is not None: + sentinels = host.pop("sentinels") + sentinel_kwargs = host.pop("sentinel_kwargs", None) + return sentinel.SentinelConnectionPool( + master_name, + sentinel.Sentinel(sentinels, sentinel_kwargs=sentinel_kwargs), + **host + ) + return ConnectionPool(**host) + + diff --git a/cave_core/websockets/django_sockets/sockets.py b/cave_core/websockets/django_sockets/sockets.py new file mode 100644 index 0000000..bda5016 --- /dev/null +++ b/cave_core/websockets/django_sockets/sockets.py @@ -0,0 +1,130 @@ +import json, asyncio, logging +from .broadcaster import Broadcaster +from .utils import run_in_thread + +logger = logging.getLogger(__name__) + +class BaseSocketServer(Broadcaster): + def __init__(self, scope, receive, send): + self.__scope__ = scope + self.__receive__ = receive + self.__send__ = send + self.is_alive = True + super().__init__() + + # Sync Functions + def send(self, data:[dict|list|str|float|int]): + """ + Send data to the websocket client. + - Note: This only sends data to the client from which the calling function was called + - Note: To send data to all clients that are subscribed to a channel, use the broadcast method + which is inherited from the Broadcaster class + + Requires: + + - data: [dict|list|str|float|int] = The data to send to the client + - Note: This data must be JSON serializable + """ + asyncio.run_coroutine_threadsafe(self.__async_send__(data), self.__loop__) + + # Async Functions + async def __async_send__(self, data:[dict|list|str|float|int]): + """ + Send data to the websocket client. + - Note: In general, this should not be called directly, but data should be broadcasted + and handled by __broadcast_listener_task__ instead + + Requires: + + - data: [dict|list|str|float|int] = The data to send to the client + - Note: This data must be JSON serializable + """ + try: + json_data = json.dumps(data) + except: + raise ValueError("Data must be JSON serializable") + await self.__send__({'type': 'websocket.send', 'text': json_data}) + + # Tasks + async def __ws_listener_task(self): + """ + Listen for incoming WS data and handle it accordingly + """ + while self.is_alive: + data = await self.__receive__() + if data['type'] == 'websocket.receive': + try: + data_text = json.loads(data['text']) + run_in_thread(self.receive, data_text) + except: + logger.exception("Invalid JSON data received") + elif data['type'] == 'websocket.disconnect': + self.__kill__() + elif data['type'] == 'websocket.connect': + await self.__send__({'type': 'websocket.accept'}) + self.connect() + else: + raise ValueError(f"Invalid WS data type: {data['type']}") + + async def __broadcast_listener_task__(self): + """ + Handle all messages that were broadcast to subscribed channels + """ + # Only handle broadcasts if the broadcaster is usable + if self.__usable__: + while self.is_alive: + channel, data = await self.async_receive_broadcast() + await self.__async_send__(data) + + # Lifecycle Methods + def __kill__(self): + """ + Kill the socket server and stop all tasks + """ + self.is_alive = False + + # Utility Methods + @classmethod + def as_asgi(cls): + """ + Return an ASGI application that can be run by daphne or other ASGI servers + + This method is a class method so that it can be called as a pure function by an un-instantiated + class which is required by the daphne server + """ + async def app(scope, receive, send): + # Wrap in a try / except block to catch unclean exits handled by Daphne + try: + # Initialize the socket server object + socket_server = cls(scope, receive, send) + # Create Tasks for the listener and queue processor + ws_listener_task = asyncio.create_task(socket_server.__ws_listener_task()) + broadcast_listener_task = asyncio.create_task(socket_server.__broadcast_listener_task__()) + # wait until the socket server is killed or the tasks are cancelled + while socket_server.is_alive: + await asyncio.sleep(0.2) + + # Catch exits handled by Daphne and allow the tasks to be cancelled + except asyncio.CancelledError: + pass + # Ensure all tasks are cancelled + ws_listener_task.cancel() + broadcast_listener_task.cancel() + # Allow the cancelation to run in the background + # The next line allows the async function to return + # and the above tasks to be cancelled in the background + await asyncio.sleep(0) + return app + + # Placeholder Methods + def receive(self, data): + """ + Placeholder method for the receive method that must be overwritten by the user + """ + raise NotImplementedError("The receive method must be implemented by the user") + + def connect(self): + """ + Placeholder method for the connect method that can be overwritten by the user. + """ + pass \ No newline at end of file diff --git a/cave_core/websockets/django_sockets/utils.py b/cave_core/websockets/django_sockets/utils.py new file mode 100644 index 0000000..8a843f2 --- /dev/null +++ b/cave_core/websockets/django_sockets/utils.py @@ -0,0 +1,167 @@ +from django.db import close_old_connections +from django.core.exceptions import ImproperlyConfigured +from django.urls.exceptions import Resolver404 +from django.urls.resolvers import RegexPattern, RoutePattern, URLResolver + +import threading, asyncio, logging +from asgiref.sync import SyncToAsync + +logger = logging.getLogger(__name__) + +def run_in_thread(command, *args, **kwargs): + """ + Takes in a synchronous command along with args and kwargs and runs it in a background + thread that is not tied to the websocket connection. + + This will be terminated when the larger daphne server is terminated + """ + thread = threading.Thread(target=command, args=args, kwargs=kwargs, daemon=True) + thread.start() + return thread + +def start_event_loop_thread(loop): + """ + Starts the event loop in a new thread + """ + asyncio.set_event_loop(loop) + loop.run_forever() + +def ensure_loop_running(loop=None): + """ + Starts the event loop in a new thread and returns the thread + """ + loop = loop if loop is not None else asyncio.get_event_loop() + if not loop.is_running(): + try: + thread = run_in_thread(start_event_loop_thread, loop) + except: + logger.log(logging.ERROR, "Event Loop already running") + return loop + +# The following code is copied directly from Django Channels (channels/db.py) +# Begin Code Copy: +################################################################################ + +class DatabaseSyncToAsync(SyncToAsync): + """ + SyncToAsync version that cleans up old database connections when it exits. + """ + + def thread_handler(self, loop, *args, **kwargs): + close_old_connections() + try: + return super().thread_handler(loop, *args, **kwargs) + finally: + close_old_connections() + +# The class is TitleCased, but we want to encourage use as a callable/decorator +database_sync_to_async = DatabaseSyncToAsync +################################################################################ +# End Code Copy: + + +# The following code is copied directly from Django Channels (channels/routing.py) +# Begin Code Copy: +################################################################################ +class ProtocolTypeRouter: + """ + Takes a mapping of protocol type names to other Application instances, + and dispatches to the right one based on protocol name (or raises an error) + """ + + def __init__(self, application_mapping): + self.application_mapping = application_mapping + + async def __call__(self, scope, receive, send): + if scope["type"] in self.application_mapping: + application = self.application_mapping[scope["type"]] + return await application(scope, receive, send) + else: + raise ValueError( + "No application configured for scope type %r" % scope["type"] + ) + +class URLRouter: + """ + Routes to different applications/consumers based on the URL path. + + Works with anything that has a ``path`` key, but intended for WebSocket + and HTTP. Uses Django's django.urls objects for resolution - + path() or re_path(). + """ + + #: This router wants to do routing based on scope[path] or + #: scope[path_remaining]. ``path()`` entries in URLRouter should not be + #: treated as endpoints (ended with ``$``), but similar to ``include()``. + _path_routing = True + + def __init__(self, routes): + self.routes = routes + + for route in self.routes: + # The inner ASGI app wants to do additional routing, route + # must not be an endpoint + if getattr(route.callback, "_path_routing", False) is True: + pattern = route.pattern + if isinstance(pattern, RegexPattern): + arg = pattern._regex + elif isinstance(pattern, RoutePattern): + arg = pattern._route + else: + raise ValueError(f"Unsupported pattern type: {type(pattern)}") + route.pattern = pattern.__class__(arg, pattern.name, is_endpoint=False) + + if not route.callback and isinstance(route, URLResolver): + raise ImproperlyConfigured( + "%s: include() is not supported in URLRouter. Use nested" + " URLRouter instances instead." % (route,) + ) + + async def __call__(self, scope, receive, send): + # Get the path + path = scope.get("path_remaining", scope.get("path", None)) + if path is None: + raise ValueError("No 'path' key in connection scope, cannot route URLs") + + if "path_remaining" not in scope: + # We are the outermost URLRouter, so handle root_path if present. + root_path = scope.get("root_path", "") + if root_path and not path.startswith(root_path): + # If root_path is present, path must start with it. + raise ValueError("No route found for path %r." % path) + path = path[len(root_path) :] + + # Remove leading / to match Django's handling + path = path.lstrip("/") + # Run through the routes we have until one matches + for route in self.routes: + try: + match = route.pattern.match(path) + if match: + new_path, args, kwargs = match + # Add defaults to kwargs from the URL pattern. + kwargs.update(route.default_args) + # Add args or kwargs into the scope + outer = scope.get("url_route", {}) + application = route.callback + return await application( + dict( + scope, + path_remaining=new_path, + url_route={ + "args": outer.get("args", ()) + args, + "kwargs": {**outer.get("kwargs", {}), **kwargs}, + }, + ), + receive, + send, + ) + except Resolver404: + pass + else: + if "path_remaining" in scope: + raise Resolver404("No route found for path %r." % path) + # We are the outermost URLRouter + raise ValueError("No route found for path %r." % path) +################################################################################ +# End Code Copy: \ No newline at end of file diff --git a/cave_core/websockets/middleware.py b/cave_core/websockets/middleware.py deleted file mode 100644 index ca8154b..0000000 --- a/cave_core/websockets/middleware.py +++ /dev/null @@ -1,28 +0,0 @@ -from django.contrib.auth.models import AnonymousUser -from rest_framework.authtoken.models import Token -from channels.db import database_sync_to_async -from channels.middleware import BaseMiddleware - - -@database_sync_to_async -def get_user(user_token): - if user_token is not None: - try: - token = Token.objects.get(key=user_token) - return token.user - except: - pass - return AnonymousUser() - - -def get_query_arg(query_string, arg, default=None): - try: - return (dict((x.split("=") for x in query_string.decode().split("&")))).get(arg, default) - except: - return None - - -class TokenAuthMiddleware(BaseMiddleware): - async def __call__(self, scope, receive, send): - scope["user"] = await get_user(get_query_arg(scope["query_string"], "user_token")) - return await super().__call__(scope, receive, send) diff --git a/cave_core/websockets/socket_server.py b/cave_core/websockets/socket_server.py new file mode 100644 index 0000000..644c9c6 --- /dev/null +++ b/cave_core/websockets/socket_server.py @@ -0,0 +1,24 @@ +from django.conf import settings + +from .commands import get_command +from .django_sockets.sockets import BaseSocketServer + +class Request: + """ + A simple request object class to mimic the behavior of the request object passed by DRF + """ + def __init__(self, user, data): + self.data = data + self.user = user + +class SocketServer(BaseSocketServer): + def receive(self, data): + if settings.DEBUG: + print("WS RECEIVE ", data['command']) + request = Request(self.__scope__.get("user"), data.get("data")) + command = get_command(data.get("command")) + command(request) + + def connect(self): + self.channel_id = str(self.__scope__.get("user").id) + self.subscribe(self.channel_id) diff --git a/cave_core/websockets/urls.py b/cave_core/websockets/urls.py deleted file mode 100644 index a018cc0..0000000 --- a/cave_core/websockets/urls.py +++ /dev/null @@ -1,6 +0,0 @@ -from django.urls import path -from .consumer import WebsocketConsumer - -websocket_urlpatterns = [ - path("ws/", WebsocketConsumer.as_asgi()), -] From 44c502a0c55bd30ebd9b0e03ac6130a06e8c54f3 Mon Sep 17 00:00:00 2001 From: Connor Makowski Date: Fri, 8 Nov 2024 10:50:34 -0500 Subject: [PATCH 02/25] Extend functionaility / scalability / docs / examples. --- cave_app/settings/development.py | 15 +- cave_core/management/commands/cache_test.py | 52 ++++- cave_core/websockets/app.py | 2 +- .../websockets/django_sockets/broadcaster.py | 42 ++-- cave_core/websockets/django_sockets/pubsub.py | 44 +++-- .../websockets/django_sockets/sockets.py | 180 ++++++++++++------ cave_core/websockets/django_sockets/utils.py | 20 +- cave_core/websockets/socket_server.py | 4 +- requirements.txt | 3 +- 9 files changed, 241 insertions(+), 121 deletions(-) diff --git a/cave_app/settings/development.py b/cave_app/settings/development.py index f75b564..51fbbb0 100755 --- a/cave_app/settings/development.py +++ b/cave_app/settings/development.py @@ -158,22 +158,9 @@ ################################################################ -# Django Channels -################################################################ -## Channels Layer Support -INSTALLED_APPS = ["daphne"] + INSTALLED_APPS -CHANNEL_LAYERS = { - 'default': { - 'BACKEND': 'channels_redis.core.RedisChannelLayer', - 'CONFIG': { - "hosts": [f"redis://{os.environ.get('REDIS_HOST')}:{os.environ.get('REDIS_PORT')}"], - }, - }, -} -################################################################ - # DJANGO_SOCKETS_CONFIG ################################################################ +INSTALLED_APPS = ["daphne"] + INSTALLED_APPS DJANGO_SOCKETS_CONFIG = { "hosts": [ {"address": f"redis://{os.environ.get('REDIS_HOST')}:{os.environ.get('REDIS_PORT')}"} diff --git a/cave_core/management/commands/cache_test.py b/cave_core/management/commands/cache_test.py index a9b762b..4e167e9 100644 --- a/cave_core/management/commands/cache_test.py +++ b/cave_core/management/commands/cache_test.py @@ -1,21 +1,55 @@ from django.core.management.base import BaseCommand from cave_core.utils.cache import Cache - from cave_core.websockets.django_sockets.sockets import BaseSocketServer -from asgiref.sync import async_to_sync +import asyncio, time -class Socket_Server(BaseSocketServer): +class CustomSocketServer(BaseSocketServer): def receive(self, data): - print('Receiving:', data) + print("WS RECEIVED: ", data) + print(f"BROADCASTING TO '{self.scope['username']}'") + self.broadcast(self.scope['username'], data) -class Command(BaseCommand): - help = 'Clearing the Cache' + def connect(self): + print(f"CONNECTING TO '{self.scope['username']}'") + self.subscribe(self.scope['username']) + +async def send(ws_data): + # Print the first 128 characters of the data being sent + print("WS SENDING:", str(ws_data)[:128]) +class Command(BaseCommand): def handle(self, *args, **options): + # Example of using the basic cache object cache = Cache() cache.set("test", "test value") print(cache.get("test")) - socket_server = Socket_Server.as_asgi()(None, None, None) - socket_server.subscribe("test_channel") - socket_server.broadcast("test_channel", "test message") + # Test the socket server cache process + base_receive = asyncio.Queue() + base_socket_server = BaseSocketServer(scope={}, receive=base_receive, send=send) + base_socket_server.start_listeners() + base_socket_server.subscribe("test_channel") + base_socket_server.broadcast("test_channel", {f"data{i}": f"test message {i}" for i in range(1024*256)}) + # Give the async functions a small amount of time to complete + time.sleep(3) + + + # Example of how to use your own socket server + # Create a custom_receive queue to simulate receiving messages from a websocket client + custom_receive = asyncio.Queue() + # Create a custom socket server with a scope of {'username':'adam'} + custom_socket_server = CustomSocketServer(scope={'username':'adam'}, receive=custom_receive.get, send=send) + # Start the listeners for the custom socket server + custom_socket_server.start_listeners() + # Simulate a connection request + # This will first fire a websocket.accept message back to the client + # Then this will call the connect method which is defined above to subscribe to the test_channel + custom_receive.put_nowait({'type': 'websocket.connect'}) + # Give the async functions a small amount of time to complete + time.sleep(.5) + # Simulate a message being received from a WS client + # This will call the receive method which is defined above + # - The receive method above will then broadcast that same message to the test_channel + custom_receive.put_nowait({'type': 'websocket.receive', 'text': '{"data": "test data"}'}) + # Give the async functions a small amount of time to complete + time.sleep(.5) diff --git a/cave_core/websockets/app.py b/cave_core/websockets/app.py index e63cc15..78c4132 100644 --- a/cave_core/websockets/app.py +++ b/cave_core/websockets/app.py @@ -4,7 +4,7 @@ from .socket_server import SocketServer websocket_urlpatterns = [ - path("ws/", SocketServer.as_asgi()), + path("ws/", SocketServer.as_asgi), ] def get_ws_asgi_application(): diff --git a/cave_core/websockets/django_sockets/broadcaster.py b/cave_core/websockets/django_sockets/broadcaster.py index 3b66e40..c4461e8 100644 --- a/cave_core/websockets/django_sockets/broadcaster.py +++ b/cave_core/websockets/django_sockets/broadcaster.py @@ -1,35 +1,16 @@ import asyncio, logging -from django.conf import settings - from .pubsub import PubSubLayer -from .utils import ensure_loop_running +from .utils import ensure_loop_running, get_config logger = logging.getLogger(__name__) class Broadcaster: - def __init__(self, *args, loop=None, **kwargs): + def __init__(self, *args, loop=None, config=None, **kwargs): self.__loop__ = ensure_loop_running(loop) - self.pubsub_layer = self.__get_pubsub_layer__() + self.pubsub_layer = self.__get_pubsub_layer__(config=config) self.__usable__ = self.pubsub_layer is not None - def __get_pubsub_layer__(self): - """ - A method to get the PubSubLayer given the settings.DJANGO_SOCKETS_CONFIG - """ - if hasattr(settings, "DJANGO_SOCKETS_CONFIG"): - if isinstance(settings.DJANGO_SOCKETS_CONFIG, dict): - if "hosts" in settings.DJANGO_SOCKETS_CONFIG: - return PubSubLayer(**settings.DJANGO_SOCKETS_CONFIG) - return None - - def __warn_on_not_usable__(self): - """ - Warn the user if the broadcaster is not usable - """ - if not self.__usable__: - logger.log(logging.ERROR, "No hosts provided in settings.DJANGO_SOCKETS_CONFIG. Broadcasting / Subscribing is not possible.") - # Sync Functions def broadcast(self, channel:str, data:[dict|list]): """ @@ -92,3 +73,20 @@ async def async_receive_broadcast(self): """ data = await self.pubsub_layer.receive() return data['channel'], data['data'] + + # Internal Methods + def __get_pubsub_layer__(self, config=None): + """ + A method to get the PubSubLayer given the settings.DJANGO_SOCKETS_CONFIG + """ + config = get_config(config) + if "hosts" in config: + return PubSubLayer(**config) + return None + + def __warn_on_not_usable__(self): + """ + Warn the user if the broadcaster is not usable + """ + if not self.__usable__: + logger.log(logging.ERROR, "No hosts provided in settings.DJANGO_SOCKETS_CONFIG. Broadcasting / Subscribing is not possible.") \ No newline at end of file diff --git a/cave_core/websockets/django_sockets/pubsub.py b/cave_core/websockets/django_sockets/pubsub.py index 1813e23..54daade 100644 --- a/cave_core/websockets/django_sockets/pubsub.py +++ b/cave_core/websockets/django_sockets/pubsub.py @@ -1,4 +1,4 @@ -import asyncio, logging, binascii, msgpack, logging +import asyncio, logging, binascii, msgpack, logging, uuid from redis.asyncio import Redis, ConnectionPool, sentinel logger = logging.getLogger(__name__) @@ -22,8 +22,13 @@ def __init__( - Note: This uses a redis connection pool or a sentinel connection pool to connect to the Redis server. - Each dictionary should contain the following key value pairs (depending on the connection pool type): - ConnectionPool: + - address: The address of the Redis server + - EG: 'redis://localhost:6379' + - Note: This can be used alone or in conjunction with other connection pool keys - host: The host of the Redis server + - Note: This must be used with the port key and can be used in conjunction with the other connection pool keys - port: The port of the Redis server + - Note: This must be used with the host key and can be used in conjunction with the other connection pool keys - Other keys listed in the redis-py ConnectionPool documentation - SentinelConnectionPool: - master_name: The name of the master in a Redis Sentinel setup @@ -83,7 +88,7 @@ async def receive(self) -> dict|None: # Cleanup / unsubscribe on interruptions / exits / timeouts await self.flush() # Raise an exception to exit the calling task - raise + raise asyncio.CancelledError async def flush(self): """ @@ -92,8 +97,10 @@ async def flush(self): for shard in set(self.subscriptions.values()): try: await shard.flush() - except BaseException: - logger.exception("Exception while flushing shard connection") + except asyncio.CancelledError: + raise asyncio.CancelledError + except BaseException as e: + logger.exception(f"Exception while flushing shard connection: {e}") self.subscriptions=dict() @@ -143,14 +150,22 @@ async def unsubscribe(self, channel): await self.__ensure_connection__() await self.pubsub.unsubscribe(channel) self.subscriptions.remove(channel) - if len(self.subscriptions) == 0: - self.flush() + if len(self.subscriptions) == 0: + await self.flush() async def publish(self, channel, message): channel = self.__get_channel_name__(channel) async with self.lock: await self.__ensure_connection__() - await self.connection.publish(channel, self.__serialize__(message)) + message = self.__serialize__(message) + # if the message is larger than 1MB, then save it as a uuid in the same cache and send the uuid + # This helps bypass the 32 MB limit on pubsub queue size for most cache servers + # Ensure that this objeect times out after 60s to keep the cache clean + if len(message) > 1024*1024: + msg_loc_key = f"{self.prefix}.{str(uuid.uuid4())}" + await self.connection.set(msg_loc_key, message, ex=60) + message = self.__serialize__(f'msg:{msg_loc_key}') + await self.connection.publish(channel, message) async def ensure_receiver_task(self): async with self.lock: @@ -169,10 +184,10 @@ async def flush(self): pass self.receiver_task = None if self.pubsub: - self.pubsub.close() + await self.pubsub.close() self.pubsub = None if self.connection: - self.connection.close() + await self.connection.close() self.connection = None # Tasks @@ -193,9 +208,15 @@ async def __receiver_task__(self): message = await self.pubsub.get_message(ignore_subscribe_messages=True) # If message is not None, put it in the channel queue if message: + message_data = self.__deserialize__(message["data"]) + # If the message was too large, then get that message from the cache + if isinstance(message_data, str): + if message_data.startswith('msg:'): + msg_loc_key = message_data[4:] + message_data = self.__deserialize__(await self.connection.get(msg_loc_key)) self.pubsub_layer_obj.queue.put_nowait({ 'channel':self.__get_channel_from_name__(message["channel"].decode()), - 'data':self.__deserialize__(message["data"]) + 'data': message_data }) # Wait for a short time to prevent busy waiting # This also serves to wait for the pubsub layer to be subscribed to the channel @@ -203,8 +224,7 @@ async def __receiver_task__(self): # Exit on cancellation, timeout, or generator exit (for cleanup afer connection is closed) except (asyncio.CancelledError,asyncio.TimeoutError,GeneratorExit): # print("RECEIVER TASK KILLED") - self.flush() - raise + raise asyncio.CancelledError except: logger.exception("Exception while receiving message from pubsub") await self.flush() diff --git a/cave_core/websockets/django_sockets/sockets.py b/cave_core/websockets/django_sockets/sockets.py index bda5016..1fe2ac1 100644 --- a/cave_core/websockets/django_sockets/sockets.py +++ b/cave_core/websockets/django_sockets/sockets.py @@ -5,12 +5,33 @@ logger = logging.getLogger(__name__) class BaseSocketServer(Broadcaster): - def __init__(self, scope, receive, send): - self.__scope__ = scope + def __init__(self, scope, receive, send, config=None): + """ + Initialize the socket server + + Required: + + - scope: dict = The scope of the websocket connection + - receive: method = The `get` method for an asyncio.Queue() object that will be used to receive data from the websocket client + - send: async callable = The function that should send the data to the websocket client + - Note: This function takes in a dictionary with the following keys: + - type: str = The type of message to send (always 'websocket.send') + - text: str = The data to send to the client (the json serialized version of the data sent / broadcasted) + + Optional: + + - config: dict = The configuration for the socket server + - hosts: list = A list of dictionaries that contain the host information for the socket server + - See: django_sockets.pubsub.PubSubLayer docs for more more comprehensive docs on the config parameter + - Default: + - If provided in django settings: settings.DJANGO_SOCKETS_CONFIG + - Else: {'hosts': [{'address': 'redis://localhost:6379'}]} + """ + self.scope = scope self.__receive__ = receive self.__send__ = send self.is_alive = True - super().__init__() + super().__init__(config=config) # Sync Functions def send(self, data:[dict|list|str|float|int]): @@ -25,46 +46,77 @@ def send(self, data:[dict|list|str|float|int]): - data: [dict|list|str|float|int] = The data to send to the client - Note: This data must be JSON serializable """ - asyncio.run_coroutine_threadsafe(self.__async_send__(data), self.__loop__) + self.run_async(self.async_send(data)) + + def run_async(self, func): + """ + Run an async function in the current object's event loop + """ + asyncio.run_coroutine_threadsafe(func, self.__loop__) # Async Functions - async def __async_send__(self, data:[dict|list|str|float|int]): + async def async_send(self, data:[dict|list|str|float|int]): """ Send data to the websocket client. - - Note: In general, this should not be called directly, but data should be broadcasted - and handled by __broadcast_listener_task__ instead + - Note: To send data to all clients that are subscribed to a channel, use the broadcast method + which is inherited from the Broadcaster class Requires: - data: [dict|list|str|float|int] = The data to send to the client - Note: This data must be JSON serializable """ - try: - json_data = json.dumps(data) - except: - raise ValueError("Data must be JSON serializable") - await self.__send__({'type': 'websocket.send', 'text': json_data}) + if self.__send__ is None: + logger.log(logging.ERROR, "The send and async_send functions are not available because the send parameter was not provided when the socket server was initialized. To silence this warning, you can provide a function that simulates some sending behavior.") + else: + try: + json_data = json.dumps(data) + except: + raise ValueError("Data must be JSON serializable") + await self.__send__({'type': 'websocket.send', 'text': json_data}) + + async def async_handle_received_broadcast(self, channel:str, data:[dict|list|str|float|int]): + """ + Handle a received broadcast from a subscribed channel + + This method is provided so that it can be overwritten by the user if they want to handle + received broadcasts from subscriptions in a specific way. + + By default, this method will send the data to the client using the async_send method + + In general, this method should only be called by the __broadcast_listener_task__ method + + Requires: + + - channel: str = The channel that the data was broadcasted to + - data: [dict|list|str|float|int] = The data that was broadcasted + - Note: This data must be JSON serializable + """ + await self.async_send(data) # Tasks - async def __ws_listener_task(self): + async def __ws_listener_task__(self): """ Listen for incoming WS data and handle it accordingly """ - while self.is_alive: - data = await self.__receive__() - if data['type'] == 'websocket.receive': - try: - data_text = json.loads(data['text']) - run_in_thread(self.receive, data_text) - except: - logger.exception("Invalid JSON data received") - elif data['type'] == 'websocket.disconnect': - self.__kill__() - elif data['type'] == 'websocket.connect': - await self.__send__({'type': 'websocket.accept'}) - self.connect() - else: - raise ValueError(f"Invalid WS data type: {data['type']}") + if self.__receive__ is None: + logger.log(logging.ERROR, "The websocket listener task is not available because the receive parameter was not provided when the socket server was initialized. To silence this warning, you can provide an asyncio.Queue() receive parameter and put items in it to simulate received ws messages.") + else: + while self.is_alive: + data = await self.__receive__() + if data['type'] == 'websocket.receive': + try: + data_text = json.loads(data['text']) + run_in_thread(self.receive, data_text) + except: + logger.exception("Invalid JSON data received") + elif data['type'] == 'websocket.disconnect': + self.__kill__() + elif data['type'] == 'websocket.connect': + await self.__send__({'type': 'websocket.accept'}) + self.connect() + else: + raise ValueError(f"Invalid WS data type: {data['type']}") async def __broadcast_listener_task__(self): """ @@ -73,8 +125,14 @@ async def __broadcast_listener_task__(self): # Only handle broadcasts if the broadcaster is usable if self.__usable__: while self.is_alive: - channel, data = await self.async_receive_broadcast() - await self.__async_send__(data) + try: + channel, data = await self.async_receive_broadcast() + await self.async_handle_received_broadcast(channel, data) + # Cleanup on exit + except asyncio.CancelledError: + raise asyncio.CancelledError + except Exception as e: + raise e # Lifecycle Methods def __kill__(self): @@ -82,44 +140,50 @@ def __kill__(self): Kill the socket server and stop all tasks """ self.is_alive = False - + + async def async_start_listeners(self): + try: + # Create Tasks for the listener and queue processor + ws_listener_task = asyncio.create_task(self.__ws_listener_task__()) + broadcast_listener_task = asyncio.create_task(self.__broadcast_listener_task__()) + # wait until the socket server is killed or the tasks are cancelled + while self.is_alive: + await asyncio.sleep(0.2) + # Catch exits handled by Daphne and allow the tasks to be cancelled + except asyncio.CancelledError: + pass + # Ensure all tasks are cancelled + ws_listener_task.cancel() + broadcast_listener_task.cancel() + # Allow the cancelation to run in the background + # The next line allows the async function to return + # and the above tasks to be cancelled in the background + await asyncio.sleep(0) + + def start_listeners(self): + """ + Start the listeners for the socket server + """ + self.run_async(self.async_start_listeners()) + # Utility Methods @classmethod - def as_asgi(cls): + async def as_asgi(cls, scope, receive, send): """ - Return an ASGI application that can be run by daphne or other ASGI servers + An ASGI application runner function that can be called by Daphne. - This method is a class method so that it can be called as a pure function by an un-instantiated - class which is required by the daphne server + This creates a new socket server instance and starts the listeners in the background. """ - async def app(scope, receive, send): - # Wrap in a try / except block to catch unclean exits handled by Daphne - try: - # Initialize the socket server object - socket_server = cls(scope, receive, send) - # Create Tasks for the listener and queue processor - ws_listener_task = asyncio.create_task(socket_server.__ws_listener_task()) - broadcast_listener_task = asyncio.create_task(socket_server.__broadcast_listener_task__()) - # wait until the socket server is killed or the tasks are cancelled - while socket_server.is_alive: - await asyncio.sleep(0.2) - - # Catch exits handled by Daphne and allow the tasks to be cancelled - except asyncio.CancelledError: - pass - # Ensure all tasks are cancelled - ws_listener_task.cancel() - broadcast_listener_task.cancel() - # Allow the cancelation to run in the background - # The next line allows the async function to return - # and the above tasks to be cancelled in the background - await asyncio.sleep(0) - return app + # Wrap in a try / except block to catch unclean exits handled by Daphne + socket_server = cls(scope, receive, send) + await socket_server.async_start_listeners() # Placeholder Methods def receive(self, data): """ Placeholder method for the receive method that must be overwritten by the user + + This is the method that will be called when data is received from the ws client. """ raise NotImplementedError("The receive method must be implemented by the user") diff --git a/cave_core/websockets/django_sockets/utils.py b/cave_core/websockets/django_sockets/utils.py index 8a843f2..4aba150 100644 --- a/cave_core/websockets/django_sockets/utils.py +++ b/cave_core/websockets/django_sockets/utils.py @@ -1,13 +1,31 @@ +from django.conf import settings from django.db import close_old_connections from django.core.exceptions import ImproperlyConfigured from django.urls.exceptions import Resolver404 from django.urls.resolvers import RegexPattern, RoutePattern, URLResolver -import threading, asyncio, logging +import asyncio, logging, threading from asgiref.sync import SyncToAsync logger = logging.getLogger(__name__) +def get_config(config=None): + """ + Get the configuration for the socket server + """ + # If the config is passed, return it. + if config is not None: + return config + # If the config is not passed, try to get it from the Django settings + elif hasattr(settings, 'DJANGO_SOCKETS_CONFIG'): + return settings.DJANGO_SOCKETS_CONFIG + # If nothing has been returned yet, return a default configuration + return { + "hosts": [ + {"address": "redis://localhost:6379"} + ] + } + def run_in_thread(command, *args, **kwargs): """ Takes in a synchronous command along with args and kwargs and runs it in a background diff --git a/cave_core/websockets/socket_server.py b/cave_core/websockets/socket_server.py index 644c9c6..0544038 100644 --- a/cave_core/websockets/socket_server.py +++ b/cave_core/websockets/socket_server.py @@ -15,10 +15,10 @@ class SocketServer(BaseSocketServer): def receive(self, data): if settings.DEBUG: print("WS RECEIVE ", data['command']) - request = Request(self.__scope__.get("user"), data.get("data")) + request = Request(self.scope.get("user"), data.get("data")) command = get_command(data.get("command")) command(request) def connect(self): - self.channel_id = str(self.__scope__.get("user").id) + self.channel_id = str(self.scope.get("user").id) self.subscribe(self.channel_id) diff --git a/requirements.txt b/requirements.txt index 34d367b..ddda237 100755 --- a/requirements.txt +++ b/requirements.txt @@ -1,14 +1,13 @@ autoflake==2.3.1 black==24.8.0 cave_utils==2.3.0 -channels==4.1.0 -channels_redis==4.2.0 daphne==4.1.2 Django==5.1.1 django-cors-headers==4.4.0 django-import-export==4.1.1 django-solo==2.3.0 djangorestframework==3.15.2 +msgpack==1.1.0 pamda>=2.5.0 pillow==10.4.0 psycopg2-binary==2.9.9 From 3dde3166776dc385885433f3b1ed0086faa13131 Mon Sep 17 00:00:00 2001 From: Willem Guter Date: Fri, 8 Nov 2024 15:21:23 -0500 Subject: [PATCH 03/25] Add ability to export non-json files --- cave_core/utils/broadcasting.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/cave_core/utils/broadcasting.py b/cave_core/utils/broadcasting.py index f54d3d2..de3188e 100755 --- a/cave_core/utils/broadcasting.py +++ b/cave_core/utils/broadcasting.py @@ -142,11 +142,11 @@ def notify( }, loading=False, ) - - @type_enforced.Enforcer + def export( self, - data: dict, + data, + name="session-data.json", ): """ Send end users a json serializable object which is downloaded by the client to the user's device @@ -156,8 +156,15 @@ def export( - `data`: - Type: dict - What: Json encodable data to send to the user + + Optional: + + - `name`: + - Type: str + - What: The name of the file to download + - Default: "session-data.json" """ self.broadcast( event="export", - data=data, + data={"data": data, "name": name}, ) From 3fd8ba92ea53ed88369c71e2c312298025bca9cb Mon Sep 17 00:00:00 2001 From: Connor Makowski Date: Fri, 8 Nov 2024 15:31:17 -0500 Subject: [PATCH 04/25] Migrate over to django_sockets. --- cave_app/asgi.py | 2 +- cave_core/management/commands/cache_test.py | 50 +-- cave_core/websockets/app.py | 4 +- cave_core/websockets/cave_ws_broadcaster.py | 2 +- .../websockets/django_sockets/__init__.py | 0 .../websockets/django_sockets/broadcaster.py | 92 ------ .../websockets/django_sockets/middleware.py | 39 --- cave_core/websockets/django_sockets/pubsub.py | 288 ------------------ .../websockets/django_sockets/sockets.py | 194 ------------ cave_core/websockets/django_sockets/utils.py | 185 ----------- cave_core/websockets/socket_server.py | 2 +- requirements.txt | 1 + 12 files changed, 17 insertions(+), 842 deletions(-) delete mode 100644 cave_core/websockets/django_sockets/__init__.py delete mode 100644 cave_core/websockets/django_sockets/broadcaster.py delete mode 100644 cave_core/websockets/django_sockets/middleware.py delete mode 100644 cave_core/websockets/django_sockets/pubsub.py delete mode 100644 cave_core/websockets/django_sockets/sockets.py delete mode 100644 cave_core/websockets/django_sockets/utils.py diff --git a/cave_app/asgi.py b/cave_app/asgi.py index 6d0a740..70a52d0 100755 --- a/cave_app/asgi.py +++ b/cave_app/asgi.py @@ -12,7 +12,7 @@ os.environ.setdefault("DJANGO_SETTINGS_MODULE", "cave_app.settings.development") from django.core.asgi import get_asgi_application -from cave_core.websockets.django_sockets.utils import ProtocolTypeRouter +from django_sockets.utils import ProtocolTypeRouter from cave_core.websockets.app import get_ws_asgi_application # Initialize asgi app items when the app starts diff --git a/cave_core/management/commands/cache_test.py b/cave_core/management/commands/cache_test.py index 4e167e9..5f67778 100644 --- a/cave_core/management/commands/cache_test.py +++ b/cave_core/management/commands/cache_test.py @@ -1,22 +1,8 @@ from django.core.management.base import BaseCommand from cave_core.utils.cache import Cache -from cave_core.websockets.django_sockets.sockets import BaseSocketServer +from django_sockets.sockets import BaseSocketServer import asyncio, time -class CustomSocketServer(BaseSocketServer): - def receive(self, data): - print("WS RECEIVED: ", data) - print(f"BROADCASTING TO '{self.scope['username']}'") - self.broadcast(self.scope['username'], data) - - def connect(self): - print(f"CONNECTING TO '{self.scope['username']}'") - self.subscribe(self.scope['username']) - -async def send(ws_data): - # Print the first 128 characters of the data being sent - print("WS SENDING:", str(ws_data)[:128]) - class Command(BaseCommand): def handle(self, *args, **options): # Example of using the basic cache object @@ -24,32 +10,18 @@ def handle(self, *args, **options): cache.set("test", "test value") print(cache.get("test")) + async def send(ws_data): + # Print the first 128 characters of the data being sent + print("WS SENDING:", str(ws_data)[:128]) + # Test the socket server cache process base_receive = asyncio.Queue() - base_socket_server = BaseSocketServer(scope={}, receive=base_receive, send=send) + base_socket_server = BaseSocketServer(scope={}, receive=base_receive.get, send=send) base_socket_server.start_listeners() base_socket_server.subscribe("test_channel") + # Small message + base_socket_server.broadcast("test_channel", {"data": "test message"}) + time.sleep(1) + # Large message base_socket_server.broadcast("test_channel", {f"data{i}": f"test message {i}" for i in range(1024*256)}) - # Give the async functions a small amount of time to complete - time.sleep(3) - - - # Example of how to use your own socket server - # Create a custom_receive queue to simulate receiving messages from a websocket client - custom_receive = asyncio.Queue() - # Create a custom socket server with a scope of {'username':'adam'} - custom_socket_server = CustomSocketServer(scope={'username':'adam'}, receive=custom_receive.get, send=send) - # Start the listeners for the custom socket server - custom_socket_server.start_listeners() - # Simulate a connection request - # This will first fire a websocket.accept message back to the client - # Then this will call the connect method which is defined above to subscribe to the test_channel - custom_receive.put_nowait({'type': 'websocket.connect'}) - # Give the async functions a small amount of time to complete - time.sleep(.5) - # Simulate a message being received from a WS client - # This will call the receive method which is defined above - # - The receive method above will then broadcast that same message to the test_channel - custom_receive.put_nowait({'type': 'websocket.receive', 'text': '{"data": "test data"}'}) - # Give the async functions a small amount of time to complete - time.sleep(.5) + time.sleep(10) diff --git a/cave_core/websockets/app.py b/cave_core/websockets/app.py index 78c4132..9477737 100644 --- a/cave_core/websockets/app.py +++ b/cave_core/websockets/app.py @@ -1,6 +1,6 @@ -from .django_sockets.utils import URLRouter -from .django_sockets.middleware import DRFTokenAuthMiddleware from django.urls import path +from django_sockets.utils import URLRouter +from django_sockets.middleware import DRFTokenAuthMiddleware from .socket_server import SocketServer websocket_urlpatterns = [ diff --git a/cave_core/websockets/cave_ws_broadcaster.py b/cave_core/websockets/cave_ws_broadcaster.py index 10d3a6b..3f4b907 100755 --- a/cave_core/websockets/cave_ws_broadcaster.py +++ b/cave_core/websockets/cave_ws_broadcaster.py @@ -1,7 +1,7 @@ # External Imports import type_enforced -from .django_sockets.broadcaster import Broadcaster +from django_sockets.broadcaster import Broadcaster broadcaster = Broadcaster() diff --git a/cave_core/websockets/django_sockets/__init__.py b/cave_core/websockets/django_sockets/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/cave_core/websockets/django_sockets/broadcaster.py b/cave_core/websockets/django_sockets/broadcaster.py deleted file mode 100644 index c4461e8..0000000 --- a/cave_core/websockets/django_sockets/broadcaster.py +++ /dev/null @@ -1,92 +0,0 @@ -import asyncio, logging - -from .pubsub import PubSubLayer -from .utils import ensure_loop_running, get_config - -logger = logging.getLogger(__name__) - -class Broadcaster: - def __init__(self, *args, loop=None, config=None, **kwargs): - self.__loop__ = ensure_loop_running(loop) - self.pubsub_layer = self.__get_pubsub_layer__(config=config) - self.__usable__ = self.pubsub_layer is not None - - # Sync Functions - def broadcast(self, channel:str, data:[dict|list]): - """ - Broadcast data to a specific channel or all channels that this socket server is subscribed to. - - Requires: - - - channel: str = The channel to broadcast the data to - - data: [dict|list] = The data to broadcast to the channel - - Note: This data must be JSON serializable - """ - asyncio.run_coroutine_threadsafe(self.async_broadcast(channel, data), self.__loop__) - - def subscribe(self, channel:str): - """ - Subscribe to a channel to receive data from broadcasts - - Requires: - - - channel: str = The channel to subscribe to - """ - asyncio.run_coroutine_threadsafe(self.async_subscribe(channel), self.__loop__) - - # Async Functions - async def async_broadcast(self, channel:str, data): - """ - Broadcast data to a channel where all relevant clients will receive the data - and send it to the client - - Requires: - - - channel: str = The channel to broadcast the data to - - data: [dict|list] = The data to broadcast to the channel - - Note: This data must be JSON serializable - """ - self.__warn_on_not_usable__() - await self.pubsub_layer.send(str(channel), data) - - - async def async_subscribe(self, channel:str): - """ - Subscribe to a channel to receive data from broadcasts - - Requires: - - - channel: str = The channel to subscribe to - """ - self.__warn_on_not_usable__() - await self.pubsub_layer.subscribe(str(channel)) - - async def async_receive_broadcast(self): - """ - Receive a broadcast from a channel that this socket server is subscribed to - - Returns: - - - channel: str = The channel that the data was broadcasted to - - data: [dict|list] = The data that was broadcasted to the channel - - Note: This data must be JSON serializable - """ - data = await self.pubsub_layer.receive() - return data['channel'], data['data'] - - # Internal Methods - def __get_pubsub_layer__(self, config=None): - """ - A method to get the PubSubLayer given the settings.DJANGO_SOCKETS_CONFIG - """ - config = get_config(config) - if "hosts" in config: - return PubSubLayer(**config) - return None - - def __warn_on_not_usable__(self): - """ - Warn the user if the broadcaster is not usable - """ - if not self.__usable__: - logger.log(logging.ERROR, "No hosts provided in settings.DJANGO_SOCKETS_CONFIG. Broadcasting / Subscribing is not possible.") \ No newline at end of file diff --git a/cave_core/websockets/django_sockets/middleware.py b/cave_core/websockets/django_sockets/middleware.py deleted file mode 100644 index c18ca0f..0000000 --- a/cave_core/websockets/django_sockets/middleware.py +++ /dev/null @@ -1,39 +0,0 @@ -from django.contrib.auth.models import AnonymousUser -from .utils import database_sync_to_async - -def drf_token_obj(): - try: - return Token - except: - from rest_framework.authtoken.models import Token - return Token - - -@database_sync_to_async -def get_drf_user(user_token): - if user_token is not None: - try: - token = drf_token_obj().objects.get(key=user_token) - return token.user - except: - pass - return AnonymousUser() - - -def get_query_arg(query_string, arg, default=None): - try: - return (dict((x.split("=") for x in query_string.decode().split("&")))).get(arg, default) - except: - return None - - -class DRFTokenAuthMiddleware: - def __init__(self, app): - self.app = app - - async def __call__(self, scope, receive, send): - scope = dict(scope) - scope["user"] = await get_drf_user(get_query_arg(scope["query_string"], "user_token")) - return await self.app(scope, receive, send) - - diff --git a/cave_core/websockets/django_sockets/pubsub.py b/cave_core/websockets/django_sockets/pubsub.py deleted file mode 100644 index 54daade..0000000 --- a/cave_core/websockets/django_sockets/pubsub.py +++ /dev/null @@ -1,288 +0,0 @@ -import asyncio, logging, binascii, msgpack, logging, uuid -from redis.asyncio import Redis, ConnectionPool, sentinel - -logger = logging.getLogger(__name__) - -class PubSubLayer: - """ - A meta PubSub Layer that uses Redis's pub/sub functionality and allows multiple subscriptions to - different channels that may or may not be on different cache shards. - - This layer is designed to be used with asyncio and is not necessarily thread-safe. - """ - def __init__( - self, - hosts=None, - ): - """ - Initialize the PubSub Layer - - Requires: - - hosts: A list of dictionaries - - Note: This uses a redis connection pool or a sentinel connection pool to connect to the Redis server. - - Each dictionary should contain the following key value pairs (depending on the connection pool type): - - ConnectionPool: - - address: The address of the Redis server - - EG: 'redis://localhost:6379' - - Note: This can be used alone or in conjunction with other connection pool keys - - host: The host of the Redis server - - Note: This must be used with the port key and can be used in conjunction with the other connection pool keys - - port: The port of the Redis server - - Note: This must be used with the host key and can be used in conjunction with the other connection pool keys - - Other keys listed in the redis-py ConnectionPool documentation - - SentinelConnectionPool: - - master_name: The name of the master in a Redis Sentinel setup - - sentinels: A list of sentinel addresses - - sentinel_kwargs: A dictionary of keyword arguments to pass to the sentinel connect - - Other keys listed in the redis-py SentinelConnectionPool documentation - """ - self.queue = asyncio.Queue() - self.subscriptions = dict() - if not isinstance(hosts, list): - raise ValueError("Hosts must be a list of dictionaries") - if len(hosts) == 0: - raise ValueError("Hosts must contain at least one dictionary") - self.shards = [ShardConnection(host, self) for host in hosts] - - # PubSub methods - async def subscribe(self, channel:str): - """ - Subscribe to a channel - """ - shard = self.__get_shard__(channel) - if channel not in self.subscriptions: - self.subscriptions[channel] = shard - await shard.subscribe(channel) - - async def unsubscribe(self, channel:str): - """ - Unsubscribe from a channel - """ - if channel in self.subscriptions: - shard = self.subscriptions.pop(channel) - await shard.unsubscribe(channel) - - async def send(self, channel:str, data): - """ - Send data to a channel - """ - shard = self.__get_shard__(channel) - await shard.publish(channel, data) - - - async def receive(self) -> dict|None: - """ - Get the next item from the queue. This will hang until an item is available. - - If the queue has been closed, cleanup and raise an exception to exit the calling task. - - Format: - { - 'channel': str, - 'data': Any - } - """ - try: - return await self.queue.get() - except (asyncio.CancelledError, asyncio.TimeoutError, GeneratorExit): - # Cleanup / unsubscribe on interruptions / exits / timeouts - await self.flush() - # Raise an exception to exit the calling task - raise asyncio.CancelledError - - async def flush(self): - """ - Flush the layer and close all connections. - """ - for shard in set(self.subscriptions.values()): - try: - await shard.flush() - except asyncio.CancelledError: - raise asyncio.CancelledError - except BaseException as e: - logger.exception(f"Exception while flushing shard connection: {e}") - self.subscriptions=dict() - - - # Utility Methods - def __get_shard__(self, channel): - """ - Return the shard that is used for this channel. - - This is done by assigning a shard index location based on the CRC32 of the channel name. - """ - if len(self.shards) == 1: - shard_index = 0 - else: - hash_val = binascii.crc32(channel.encode("utf8")) & 0xFFF - shard_index = int(hash_val / (4096 / float(len(self.shards)))) - return self.shards[shard_index] - - -class ShardConnection: - def __init__(self, host, pubsub_layer_obj, prefix="pubsub"): - self.connection_pool = self.__get_connection_pool__(host) - self.pubsub_layer_obj = pubsub_layer_obj - self.lock = asyncio.Lock() - self.connection = None - self.pubsub = None - self.receiver_task = None - self.subscriptions = set() - self.prefix = prefix - - # PubSub methods - async def subscribe(self, channel): - channel = self.__get_channel_name__(channel) - async with self.lock: - if channel in self.subscriptions: - return - await self.__ensure_connection__() - await self.pubsub.subscribe(channel) - self.subscriptions.add(channel) - # Drop out of the lock to start the receiver task which requires the lock to be released - await self.ensure_receiver_task() - - async def unsubscribe(self, channel): - channel = self.__get_channel_name__(channel) - async with self.lock: - if channel not in self.subscriptions: - return - await self.__ensure_connection__() - await self.pubsub.unsubscribe(channel) - self.subscriptions.remove(channel) - if len(self.subscriptions) == 0: - await self.flush() - - async def publish(self, channel, message): - channel = self.__get_channel_name__(channel) - async with self.lock: - await self.__ensure_connection__() - message = self.__serialize__(message) - # if the message is larger than 1MB, then save it as a uuid in the same cache and send the uuid - # This helps bypass the 32 MB limit on pubsub queue size for most cache servers - # Ensure that this objeect times out after 60s to keep the cache clean - if len(message) > 1024*1024: - msg_loc_key = f"{self.prefix}.{str(uuid.uuid4())}" - await self.connection.set(msg_loc_key, message, ex=60) - message = self.__serialize__(f'msg:{msg_loc_key}') - await self.connection.publish(channel, message) - - async def ensure_receiver_task(self): - async with self.lock: - if self.receiver_task is None: - self.receiver_task = asyncio.create_task(self.__receiver_task__()) - # This is needed to continue the main coroutine execution after create_task - await asyncio.sleep(0) - - async def flush(self): - # Flushing is not locked since it can be called from inside the lock - if self.receiver_task: - self.receiver_task.cancel() - try: - await self.receiver_task - except asyncio.CancelledError: - pass - self.receiver_task = None - if self.pubsub: - await self.pubsub.close() - self.pubsub = None - if self.connection: - await self.connection.close() - self.connection = None - - # Tasks - async def __receiver_task__(self): - """ - Start a task to receive messages from the pubsub and put them in the queue - - This task will run until all subscriptions are removed. - - It will loop continuously as awaiting the pubsub.get_message will not hang the event loop. - """ - # print("RECEIVER TASK STARTING") - while len(self.subscriptions) > 0: - try: - # Make sure pubsub is active and subscribed otherwise wait for subscription to be established - if self.pubsub and self.pubsub.subscribed: - # Get messages from the pubsub - message = await self.pubsub.get_message(ignore_subscribe_messages=True) - # If message is not None, put it in the channel queue - if message: - message_data = self.__deserialize__(message["data"]) - # If the message was too large, then get that message from the cache - if isinstance(message_data, str): - if message_data.startswith('msg:'): - msg_loc_key = message_data[4:] - message_data = self.__deserialize__(await self.connection.get(msg_loc_key)) - self.pubsub_layer_obj.queue.put_nowait({ - 'channel':self.__get_channel_from_name__(message["channel"].decode()), - 'data': message_data - }) - # Wait for a short time to prevent busy waiting - # This also serves to wait for the pubsub layer to be subscribed to the channel - await asyncio.sleep(0.1) - # Exit on cancellation, timeout, or generator exit (for cleanup afer connection is closed) - except (asyncio.CancelledError,asyncio.TimeoutError,GeneratorExit): - # print("RECEIVER TASK KILLED") - raise asyncio.CancelledError - except: - logger.exception("Exception while receiving message from pubsub") - await self.flush() - - # Utility Methods - async def __ensure_connection__(self): - """ - Ensure that the connection to the cache is established. - - Note: This should only be called within a lock. - """ - if not self.connection: - self.connection = Redis(connection_pool=self.connection_pool) - self.pubsub = self.connection.pubsub() - - def __get_channel_name__(self, channel): - """ - Get the channel name with the prefix. - """ - return f"{self.prefix}.{channel}" - - def __get_channel_from_name__(self, channel_name): - """ - Get the channel name without the prefix. - """ - return channel_name[len(self.prefix)+1:] - - def __serialize__(self, message): - """ - Serialize a message into bytes. - """ - return msgpack.packb(message) - - def __deserialize__(self, message): - """ - Deserialize a message from bytes. - """ - return msgpack.unpackb(message, strict_map_key=False) - - @staticmethod - def __get_connection_pool__(host:dict): - """ - Get a connection pool from a host dictionary - """ - host = host.copy() - if "address" in host: - address = host.pop("address") - return ConnectionPool.from_url(address, **host) - - master_name = host.pop("master_name", None) - if master_name is not None: - sentinels = host.pop("sentinels") - sentinel_kwargs = host.pop("sentinel_kwargs", None) - return sentinel.SentinelConnectionPool( - master_name, - sentinel.Sentinel(sentinels, sentinel_kwargs=sentinel_kwargs), - **host - ) - return ConnectionPool(**host) - - diff --git a/cave_core/websockets/django_sockets/sockets.py b/cave_core/websockets/django_sockets/sockets.py deleted file mode 100644 index 1fe2ac1..0000000 --- a/cave_core/websockets/django_sockets/sockets.py +++ /dev/null @@ -1,194 +0,0 @@ -import json, asyncio, logging -from .broadcaster import Broadcaster -from .utils import run_in_thread - -logger = logging.getLogger(__name__) - -class BaseSocketServer(Broadcaster): - def __init__(self, scope, receive, send, config=None): - """ - Initialize the socket server - - Required: - - - scope: dict = The scope of the websocket connection - - receive: method = The `get` method for an asyncio.Queue() object that will be used to receive data from the websocket client - - send: async callable = The function that should send the data to the websocket client - - Note: This function takes in a dictionary with the following keys: - - type: str = The type of message to send (always 'websocket.send') - - text: str = The data to send to the client (the json serialized version of the data sent / broadcasted) - - Optional: - - - config: dict = The configuration for the socket server - - hosts: list = A list of dictionaries that contain the host information for the socket server - - See: django_sockets.pubsub.PubSubLayer docs for more more comprehensive docs on the config parameter - - Default: - - If provided in django settings: settings.DJANGO_SOCKETS_CONFIG - - Else: {'hosts': [{'address': 'redis://localhost:6379'}]} - """ - self.scope = scope - self.__receive__ = receive - self.__send__ = send - self.is_alive = True - super().__init__(config=config) - - # Sync Functions - def send(self, data:[dict|list|str|float|int]): - """ - Send data to the websocket client. - - Note: This only sends data to the client from which the calling function was called - - Note: To send data to all clients that are subscribed to a channel, use the broadcast method - which is inherited from the Broadcaster class - - Requires: - - - data: [dict|list|str|float|int] = The data to send to the client - - Note: This data must be JSON serializable - """ - self.run_async(self.async_send(data)) - - def run_async(self, func): - """ - Run an async function in the current object's event loop - """ - asyncio.run_coroutine_threadsafe(func, self.__loop__) - - # Async Functions - async def async_send(self, data:[dict|list|str|float|int]): - """ - Send data to the websocket client. - - Note: To send data to all clients that are subscribed to a channel, use the broadcast method - which is inherited from the Broadcaster class - - Requires: - - - data: [dict|list|str|float|int] = The data to send to the client - - Note: This data must be JSON serializable - """ - if self.__send__ is None: - logger.log(logging.ERROR, "The send and async_send functions are not available because the send parameter was not provided when the socket server was initialized. To silence this warning, you can provide a function that simulates some sending behavior.") - else: - try: - json_data = json.dumps(data) - except: - raise ValueError("Data must be JSON serializable") - await self.__send__({'type': 'websocket.send', 'text': json_data}) - - async def async_handle_received_broadcast(self, channel:str, data:[dict|list|str|float|int]): - """ - Handle a received broadcast from a subscribed channel - - This method is provided so that it can be overwritten by the user if they want to handle - received broadcasts from subscriptions in a specific way. - - By default, this method will send the data to the client using the async_send method - - In general, this method should only be called by the __broadcast_listener_task__ method - - Requires: - - - channel: str = The channel that the data was broadcasted to - - data: [dict|list|str|float|int] = The data that was broadcasted - - Note: This data must be JSON serializable - """ - await self.async_send(data) - - # Tasks - async def __ws_listener_task__(self): - """ - Listen for incoming WS data and handle it accordingly - """ - if self.__receive__ is None: - logger.log(logging.ERROR, "The websocket listener task is not available because the receive parameter was not provided when the socket server was initialized. To silence this warning, you can provide an asyncio.Queue() receive parameter and put items in it to simulate received ws messages.") - else: - while self.is_alive: - data = await self.__receive__() - if data['type'] == 'websocket.receive': - try: - data_text = json.loads(data['text']) - run_in_thread(self.receive, data_text) - except: - logger.exception("Invalid JSON data received") - elif data['type'] == 'websocket.disconnect': - self.__kill__() - elif data['type'] == 'websocket.connect': - await self.__send__({'type': 'websocket.accept'}) - self.connect() - else: - raise ValueError(f"Invalid WS data type: {data['type']}") - - async def __broadcast_listener_task__(self): - """ - Handle all messages that were broadcast to subscribed channels - """ - # Only handle broadcasts if the broadcaster is usable - if self.__usable__: - while self.is_alive: - try: - channel, data = await self.async_receive_broadcast() - await self.async_handle_received_broadcast(channel, data) - # Cleanup on exit - except asyncio.CancelledError: - raise asyncio.CancelledError - except Exception as e: - raise e - - # Lifecycle Methods - def __kill__(self): - """ - Kill the socket server and stop all tasks - """ - self.is_alive = False - - async def async_start_listeners(self): - try: - # Create Tasks for the listener and queue processor - ws_listener_task = asyncio.create_task(self.__ws_listener_task__()) - broadcast_listener_task = asyncio.create_task(self.__broadcast_listener_task__()) - # wait until the socket server is killed or the tasks are cancelled - while self.is_alive: - await asyncio.sleep(0.2) - # Catch exits handled by Daphne and allow the tasks to be cancelled - except asyncio.CancelledError: - pass - # Ensure all tasks are cancelled - ws_listener_task.cancel() - broadcast_listener_task.cancel() - # Allow the cancelation to run in the background - # The next line allows the async function to return - # and the above tasks to be cancelled in the background - await asyncio.sleep(0) - - def start_listeners(self): - """ - Start the listeners for the socket server - """ - self.run_async(self.async_start_listeners()) - - # Utility Methods - @classmethod - async def as_asgi(cls, scope, receive, send): - """ - An ASGI application runner function that can be called by Daphne. - - This creates a new socket server instance and starts the listeners in the background. - """ - # Wrap in a try / except block to catch unclean exits handled by Daphne - socket_server = cls(scope, receive, send) - await socket_server.async_start_listeners() - - # Placeholder Methods - def receive(self, data): - """ - Placeholder method for the receive method that must be overwritten by the user - - This is the method that will be called when data is received from the ws client. - """ - raise NotImplementedError("The receive method must be implemented by the user") - - def connect(self): - """ - Placeholder method for the connect method that can be overwritten by the user. - """ - pass \ No newline at end of file diff --git a/cave_core/websockets/django_sockets/utils.py b/cave_core/websockets/django_sockets/utils.py deleted file mode 100644 index 4aba150..0000000 --- a/cave_core/websockets/django_sockets/utils.py +++ /dev/null @@ -1,185 +0,0 @@ -from django.conf import settings -from django.db import close_old_connections -from django.core.exceptions import ImproperlyConfigured -from django.urls.exceptions import Resolver404 -from django.urls.resolvers import RegexPattern, RoutePattern, URLResolver - -import asyncio, logging, threading -from asgiref.sync import SyncToAsync - -logger = logging.getLogger(__name__) - -def get_config(config=None): - """ - Get the configuration for the socket server - """ - # If the config is passed, return it. - if config is not None: - return config - # If the config is not passed, try to get it from the Django settings - elif hasattr(settings, 'DJANGO_SOCKETS_CONFIG'): - return settings.DJANGO_SOCKETS_CONFIG - # If nothing has been returned yet, return a default configuration - return { - "hosts": [ - {"address": "redis://localhost:6379"} - ] - } - -def run_in_thread(command, *args, **kwargs): - """ - Takes in a synchronous command along with args and kwargs and runs it in a background - thread that is not tied to the websocket connection. - - This will be terminated when the larger daphne server is terminated - """ - thread = threading.Thread(target=command, args=args, kwargs=kwargs, daemon=True) - thread.start() - return thread - -def start_event_loop_thread(loop): - """ - Starts the event loop in a new thread - """ - asyncio.set_event_loop(loop) - loop.run_forever() - -def ensure_loop_running(loop=None): - """ - Starts the event loop in a new thread and returns the thread - """ - loop = loop if loop is not None else asyncio.get_event_loop() - if not loop.is_running(): - try: - thread = run_in_thread(start_event_loop_thread, loop) - except: - logger.log(logging.ERROR, "Event Loop already running") - return loop - -# The following code is copied directly from Django Channels (channels/db.py) -# Begin Code Copy: -################################################################################ - -class DatabaseSyncToAsync(SyncToAsync): - """ - SyncToAsync version that cleans up old database connections when it exits. - """ - - def thread_handler(self, loop, *args, **kwargs): - close_old_connections() - try: - return super().thread_handler(loop, *args, **kwargs) - finally: - close_old_connections() - -# The class is TitleCased, but we want to encourage use as a callable/decorator -database_sync_to_async = DatabaseSyncToAsync -################################################################################ -# End Code Copy: - - -# The following code is copied directly from Django Channels (channels/routing.py) -# Begin Code Copy: -################################################################################ -class ProtocolTypeRouter: - """ - Takes a mapping of protocol type names to other Application instances, - and dispatches to the right one based on protocol name (or raises an error) - """ - - def __init__(self, application_mapping): - self.application_mapping = application_mapping - - async def __call__(self, scope, receive, send): - if scope["type"] in self.application_mapping: - application = self.application_mapping[scope["type"]] - return await application(scope, receive, send) - else: - raise ValueError( - "No application configured for scope type %r" % scope["type"] - ) - -class URLRouter: - """ - Routes to different applications/consumers based on the URL path. - - Works with anything that has a ``path`` key, but intended for WebSocket - and HTTP. Uses Django's django.urls objects for resolution - - path() or re_path(). - """ - - #: This router wants to do routing based on scope[path] or - #: scope[path_remaining]. ``path()`` entries in URLRouter should not be - #: treated as endpoints (ended with ``$``), but similar to ``include()``. - _path_routing = True - - def __init__(self, routes): - self.routes = routes - - for route in self.routes: - # The inner ASGI app wants to do additional routing, route - # must not be an endpoint - if getattr(route.callback, "_path_routing", False) is True: - pattern = route.pattern - if isinstance(pattern, RegexPattern): - arg = pattern._regex - elif isinstance(pattern, RoutePattern): - arg = pattern._route - else: - raise ValueError(f"Unsupported pattern type: {type(pattern)}") - route.pattern = pattern.__class__(arg, pattern.name, is_endpoint=False) - - if not route.callback and isinstance(route, URLResolver): - raise ImproperlyConfigured( - "%s: include() is not supported in URLRouter. Use nested" - " URLRouter instances instead." % (route,) - ) - - async def __call__(self, scope, receive, send): - # Get the path - path = scope.get("path_remaining", scope.get("path", None)) - if path is None: - raise ValueError("No 'path' key in connection scope, cannot route URLs") - - if "path_remaining" not in scope: - # We are the outermost URLRouter, so handle root_path if present. - root_path = scope.get("root_path", "") - if root_path and not path.startswith(root_path): - # If root_path is present, path must start with it. - raise ValueError("No route found for path %r." % path) - path = path[len(root_path) :] - - # Remove leading / to match Django's handling - path = path.lstrip("/") - # Run through the routes we have until one matches - for route in self.routes: - try: - match = route.pattern.match(path) - if match: - new_path, args, kwargs = match - # Add defaults to kwargs from the URL pattern. - kwargs.update(route.default_args) - # Add args or kwargs into the scope - outer = scope.get("url_route", {}) - application = route.callback - return await application( - dict( - scope, - path_remaining=new_path, - url_route={ - "args": outer.get("args", ()) + args, - "kwargs": {**outer.get("kwargs", {}), **kwargs}, - }, - ), - receive, - send, - ) - except Resolver404: - pass - else: - if "path_remaining" in scope: - raise Resolver404("No route found for path %r." % path) - # We are the outermost URLRouter - raise ValueError("No route found for path %r." % path) -################################################################################ -# End Code Copy: \ No newline at end of file diff --git a/cave_core/websockets/socket_server.py b/cave_core/websockets/socket_server.py index 0544038..a1eeaf7 100644 --- a/cave_core/websockets/socket_server.py +++ b/cave_core/websockets/socket_server.py @@ -1,7 +1,7 @@ from django.conf import settings from .commands import get_command -from .django_sockets.sockets import BaseSocketServer +from django_sockets.sockets import BaseSocketServer class Request: """ diff --git a/requirements.txt b/requirements.txt index ddda237..17ca8b0 100755 --- a/requirements.txt +++ b/requirements.txt @@ -5,6 +5,7 @@ daphne==4.1.2 Django==5.1.1 django-cors-headers==4.4.0 django-import-export==4.1.1 +django_sockets==1.0.0 django-solo==2.3.0 djangorestframework==3.15.2 msgpack==1.1.0 From 07700d050d3e031f531941c60e17770928a18fdf Mon Sep 17 00:00:00 2001 From: Willem Guter Date: Fri, 8 Nov 2024 15:56:25 -0500 Subject: [PATCH 05/25] Fix export example --- cave_api/cave_api/examples/api_command_export.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/cave_api/cave_api/examples/api_command_export.py b/cave_api/cave_api/examples/api_command_export.py index 812b504..5b9f933 100644 --- a/cave_api/cave_api/examples/api_command_export.py +++ b/cave_api/cave_api/examples/api_command_export.py @@ -1,3 +1,5 @@ +import json + def execute_command(session_data, socket, command="init", **kwargs): # `init` is the default command that is run when a session is created # It should return an initial state for the app @@ -42,7 +44,7 @@ def execute_command(session_data, socket, command="init", **kwargs): # For this example, `myCommand` defined in the api in appBar.data.myCommandButton.apiCommand elif command == "myCommand": # Send the current session data to app users - socket.export(session_data) + socket.export(f'data:application/json,{json.dumps(session_data)}') # Log a message in the console print("Console Log: `myCommand` has been triggered!") return session_data From dbc21ecbf8f01fd3217cd89c4bc02e4b12838f34 Mon Sep 17 00:00:00 2001 From: Connor Makowski Date: Wed, 13 Nov 2024 09:26:44 -0500 Subject: [PATCH 06/25] Minor Brodcast Cleanup --- cave_core/models.py | 17 +++++++++++++---- cave_core/websockets/cave_ws_broadcaster.py | 5 ----- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/cave_core/models.py b/cave_core/models.py index 6318f17..53b41b3 100755 --- a/cave_core/models.py +++ b/cave_core/models.py @@ -1081,7 +1081,7 @@ def get_data(self, keys:list[str]=None, client_only:bool=True) -> dict: pamda.assocPath(path=['data', key], value=value, data=self.__dict__) return {key:pamda.path(['data', key], self.__dict__) for key in keys} - def broadcast_changed_data(self, previous_versions: dict) -> None: + def broadcast_changed_data(self, previous_versions: dict, broadcast_loading:bool=True) -> None: """ Broadcasts and returns all data that has changed given some set of previous versions @@ -1090,6 +1090,13 @@ def broadcast_changed_data(self, previous_versions: dict) -> None: - `previous_versions`: - Type: dict - What: The endpoint provided previous versions to check vs the current server versions to determine which data has changed + + Optional: + + - `broadcast_loading`: + - Type: bool + - What: If True, the loading state will be broadcasted to all users before and after the data is broadcasted + - Default: True """ # print('==BROADCAST CHANGED DATA==') # Fill in missing session data if none is present @@ -1105,13 +1112,15 @@ def broadcast_changed_data(self, previous_versions: dict) -> None: ] data = self.get_data(client_only=True, keys=updated_keys) # Broadcast the updated versions and data - self.broadcast_loading(True) + if broadcast_loading: + self.broadcast_loading(True) CaveWSBroadcaster(self).broadcast( event="overwrite", versions=versions, data=data, ) - self.broadcast_loading(False) + if broadcast_loading: + self.broadcast_loading(False) # print('==BROADCAST CHANGED DATA END==') def replace_data(self, data, wipeExisting): @@ -1226,7 +1235,7 @@ def execute_api_command( # Broadcast the changed data if specified if broadcast_changes: - self.broadcast_changed_data(previous_versions=previous_versions) + self.broadcast_changed_data(previous_versions=previous_versions, broadcast_loading=False) # Update the execution state overriding any blocks self.set_loading(False, override_block=True) # print('==EXECUTE API COMMAND END==\n') diff --git a/cave_core/websockets/cave_ws_broadcaster.py b/cave_core/websockets/cave_ws_broadcaster.py index 3f4b907..10e7a5e 100755 --- a/cave_core/websockets/cave_ws_broadcaster.py +++ b/cave_core/websockets/cave_ws_broadcaster.py @@ -18,10 +18,6 @@ ) theme_list = set(["primary", "secondary", "error", "warning", "info", "success"]) -# Loading Events -loading_true = {"event": "updateLoading", "data": {"data_path": ["data_loading"], "data": True}} -loading_false = {"event": "updateLoading", "data": {"data_path": ["data_loading"], "data": False}} - class CaveWSBroadcaster: def __init__(self, model_object): self.model_object = model_object @@ -63,7 +59,6 @@ def broadcast(self, event: str, data: dict, **kwargs): - Type: str - What: The event to broadcast - Allowed Values: "mutation", "overwrite", "message", "updateSessions", "updateLoading" - - Note: If `event` is "overwrite", then a loading broadcast will be sent instead - `data`: - Type: dict - What: The data to broadcast From 5fd5a981e1b2403685919d34c38fb14d8f02e463 Mon Sep 17 00:00:00 2001 From: Connor Makowski Date: Wed, 13 Nov 2024 12:10:35 -0500 Subject: [PATCH 07/25] Auto heal user_ids cache object on cache data loss. --- cave_core/models.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/cave_core/models.py b/cave_core/models.py index 53b41b3..87ceb75 100755 --- a/cave_core/models.py +++ b/cave_core/models.py @@ -990,7 +990,12 @@ def get_user_ids(self) -> list: - EG To broadcast messgaes to everyone in the session - EG to prevent deletion if more than one user is in the session """ - return cache.get(f"session:{self.id}:user_ids", []) + user_ids = cache.get(f"session:{self.id}:user_ids", None) + # Auto heal User Ids On Cache Data Loss + if user_ids == None: + self.update_user_ids() + user_ids = cache.get(f"session:{self.id}:user_ids", []) + return user_ids def update_user_ids(self) -> None: """ @@ -1112,6 +1117,7 @@ def broadcast_changed_data(self, previous_versions: dict, broadcast_loading:bool ] data = self.get_data(client_only=True, keys=updated_keys) # Broadcast the updated versions and data + if broadcast_loading: self.broadcast_loading(True) CaveWSBroadcaster(self).broadcast( From dd02f8f00a2358d83fcbc9b1715e0de8ccb576f4 Mon Sep 17 00:00:00 2001 From: Connor Makowski Date: Wed, 13 Nov 2024 14:33:53 -0500 Subject: [PATCH 08/25] Get cache persistence working with serverless caches. --- .gitignore | 3 + cave_app/settings/development.py | 14 +- cave_app/storage_backends.py | 2 +- cave_core/management/commands/clearcache.py | 2 +- cave_core/models.py | 23 ++- cave_core/utils/cache.py | 163 +++++--------------- cave_core/utils/session_persistence.py | 44 ++++++ example.env | 16 +- persistent_cache/.gitignore | 4 - 9 files changed, 116 insertions(+), 155 deletions(-) create mode 100644 cave_core/utils/session_persistence.py delete mode 100644 persistent_cache/.gitignore diff --git a/.gitignore b/.gitignore index 81e6fc1..0e607ce 100755 --- a/.gitignore +++ b/.gitignore @@ -15,3 +15,6 @@ venv # Logs logs + +# Persistent Cache +__cache__ \ No newline at end of file diff --git a/cave_app/settings/development.py b/cave_app/settings/development.py index 51fbbb0..cadde71 100755 --- a/cave_app/settings/development.py +++ b/cave_app/settings/development.py @@ -175,20 +175,14 @@ CACHE_TIMEOUT = config("CACHE_TIMEOUT", default=0, cast=int) assert CACHE_TIMEOUT >= 0, "CACHE_TIMEOUT must be greater than or equal to 0" assert CACHE_BACKUP_INTERVAL >= 0, "CACHE_BACKUP_INTERVAL must be greater than or equal to 0" -if CACHE_TIMEOUT > 0 and CACHE_BACKUP_INTERVAL == 0: - print("CACHE_TIMEOUT is set but CACHE_BACKUP_INTERVAL is not set") - print("Setting CACHE_BACKUP_INTERVAL to a third of CACHE_TIMEOUT") - CACHE_BACKUP_INTERVAL = CACHE_TIMEOUT//3 -if CACHE_TIMEOUT>0 and CACHE_TIMEOUT < CACHE_BACKUP_INTERVAL*2: - print("CACHE_TIMEOUT must be greater than CACHE_BACKUP_INTERVAL * 2") - print("Setting CACHE_TIMEOUT to CACHE_BACKUP_INTERVAL*3") - CACHE_TIMEOUT = CACHE_BACKUP_INTERVAL*3 +assert CACHE_TIMEOUT >= CACHE_BACKUP_INTERVAL*2, "CACHE_TIMEOUT must be at least twice as long as CACHE_BACKUP_INTERVAL" +assert CACHE_TIMEOUT == 0 or CACHE_BACKUP_INTERVAL > 0, "CACHE_BACKUP_INTERVAL must be greater than 0 if CACHE_TIMEOUT is greater than 0" CACHE_TIMEOUT = None if CACHE_TIMEOUT == 0 else CACHE_TIMEOUT +CACHE_BACKUP_INTERVAL = None if CACHE_BACKUP_INTERVAL == 0 else CACHE_BACKUP_INTERVAL CACHES = { "default": { "BACKEND": "django.core.cache.backends.redis.RedisCache", - "LOCATION": f"redis://{os.environ.get('REDIS_HOST')}:{os.environ.get('REDIS_PORT')}", - "TIMEOUT": CACHE_TIMEOUT, + "LOCATION": f"redis://{os.environ.get('REDIS_HOST')}:{os.environ.get('REDIS_PORT')}" } } ################################################################ diff --git a/cave_app/storage_backends.py b/cave_app/storage_backends.py index 9811269..b099624 100755 --- a/cave_app/storage_backends.py +++ b/cave_app/storage_backends.py @@ -11,7 +11,7 @@ class StaticStorage(StaticFilesStorage): pass class CacheStorage(FileSystemStorage): - location = 'persistent_cache' + location = '__cache__' # Special code to always overwrite the file on a save def get_available_name(self, name: str, max_length: int | None = None) -> str: if self.exists(name): diff --git a/cave_core/management/commands/clearcache.py b/cave_core/management/commands/clearcache.py index 996d284..cee3a0b 100644 --- a/cave_core/management/commands/clearcache.py +++ b/cave_core/management/commands/clearcache.py @@ -9,6 +9,6 @@ def handle(self, *args, **options): self.stdout.write("Clearing the Cache (memory and persistent)...") try: cache = Cache() - cache.delete_pattern("*") + cache.flush(memory=True, persistent=True) except Exception as e: raise CommandError(f"Failed to clear the cache with the following error: {e}") \ No newline at end of file diff --git a/cave_core/models.py b/cave_core/models.py index 87ceb75..ee7266b 100755 --- a/cave_core/models.py +++ b/cave_core/models.py @@ -16,6 +16,7 @@ from cave_core.utils.cache import Cache from cave_core.utils.constants import api_keys, background_api_keys from cave_core.utils.validators import limit_upload_size +from cave_core.utils.session_persistence import session_persistence_service from cave_api.api import execute_command from cave_app.storage_backends import PrivateMediaStorage, PublicMediaStorage @@ -1336,6 +1337,20 @@ def clone(self, name, description): cache.set_many({f"session:{new_session.id}:data:{key}": value for key, value in session_data.items()}) new_session.set_versions({key:0 for key in session_data.keys()}) return new_session + + def get_cache_keys(self): + """ + Gets all cache keys for this session + """ + keys = [f"session:{self.id}:{key}" for key in ["versions", "executing", "user_ids"]] + keys += [f"session:{self.id}:data:{key}" for key in list(cache.get(f"session:{self.id}:versions", {}).keys())] + return keys + + def persist_cache_data(self): + """ + Persists the current session data to the persistent cache + """ + cache.persist_many(self.get_cache_keys()) def error_on_session_not_empty(self): """ @@ -1429,11 +1444,8 @@ def handle_session_on_delete(sender, instance, **kwargs): """ instance.team.update_sessions_list() # Clear the data from the cache and persistent cache if present - cache_session_id = f"session:{instance.id}" - generic_keys = [f"{cache_session_id}:{key}" for key in ['versions', 'executing','user_ids']] - data_keys = [f"{cache_session_id}:data:{key}" for key in cache.get(f"{cache_session_id}:versions", {}).keys()] cache.delete_many( - data_keys + generic_keys, + instance.get_cache_keys(), memory=True, persistent=True ) @@ -1452,3 +1464,6 @@ def update_team_ids(sender, instance, **kwargs): def create_personal_team(sender, instance, created, **kwargs): if created: instance.create_personal_team() + +# Services +session_persistence_service(cache=cache, Sessions=Sessions) \ No newline at end of file diff --git a/cave_core/utils/cache.py b/cave_core/utils/cache.py index bf1d71f..b76ce1d 100644 --- a/cave_core/utils/cache.py +++ b/cave_core/utils/cache.py @@ -3,99 +3,13 @@ from django.core.files.base import ContentFile from cave_app.storage_backends import CacheStorage -import json, time, os -from pamda import pamda - -@pamda.thunkify -def persist_cache_background_service(persistent_cache, id_regex:str): - """ - Persists the data in the cache to the persistent storage - - persistent_cache: Cache - The persistent cache object to use - id_regex: str - The regex to use to find the keys in the cache - - Notes: - - This function is designed to be run in a separate thread - - This function will only work for Redis based caches - """ - print('Starting the cache persistence background service...') - while True: - try: - # Interruptable sleep - for i in range(settings.CACHE_BACKUP_INTERVAL): - time.sleep(1) - # Load the meta data - # This is used to prevent multiple backups from happening at the same time - meta = persistent_cache.get('meta') - if meta is None: - meta = {'last_update':0} - - now = time.time() - # Assume multiple servers are running - only run this if the last update was long enough ago - if meta['last_update']+settings.CACHE_BACKUP_INTERVAL 0: - service = persist_cache_background_service(self, 'session:*') - service.asyncRun(daemon=True) - - def __format_low_level_cache_key__(self, key: bytes): - return ":".join(key.decode().split(':')[2:]) - - def __low_level_cache__(self): - return self.cache._cache.get_client() - - def __low_level_cache_keys__(self, pattern:str): - return [self.__format_low_level_cache_key__(key) for key in self.__low_level_cache__().keys(pattern)] - - def keys(self, pattern:str, memory:bool=False, persistent:bool=False): - """ - Gets the keys in the cache based on a pattern - - Note: This function will only work for Valkey or Redis based caches - Note: This function will often not be supported by scalable or serverless Caches - - pattern: str - The pattern to use to find the keys - Note: The pattern is a simple string pattern to indicate key starts with - Note: A '*' can be used as a wildcard at the end of the pattern but not in the middle - memory: bool - Whether to get the keys from the memory based cache - Default: False - Note: Can not be True if persistent is True - persistent: bool - Whether to get the keys from the persistent storage cache - Default: False - Note: Can not be True if memory is True - """ - assert memory ^ persistent, "Cache.keys(): Either memory or persistent must be True but not both" - if memory: - pattern = f"*{pattern}" if not pattern.startswith("*") else pattern - return self.__low_level_cache_keys__(pattern) - else: - if pattern == '*': - return [item for item in self.listdir('')[1] if item != '.gitignore'] - return [item for item in self.listdir('')[1] if item != '.gitignore' and pattern.replace('*', '') in item] - - def get(self, data_id:str, default=None): """ Gets the data from the cache if it exists, otherwise from the persistent storage and caches it @@ -114,8 +28,7 @@ def get(self, data_id:str, default=None): data = self.cache.get(data_id, "__NONE__") if data != "__NONE__": return data - # Only pull from the persistent storage if the CACHE_BACKUP_INTERVAL is greater than 0 - if settings.CACHE_BACKUP_INTERVAL > 0: + else: try: with self.open(data_id) as f: data = json.load(f) @@ -140,25 +53,7 @@ def get_many(self, data_ids:list, default=None): Returns: dict """ # print(f'Cache -> Getting: {data_ids}') - # Note: This has to be done in a loop because self.cache.get_many() is not always reliable (esp for Serverless Caches) - data = {data_id:self.cache.get(data_id, "__NONE__") for data_id in data_ids} - missing_ids = [data_id for data_id, data_value in data.items() if data_value == "__NONE__"] - if len(missing_ids) > 0: - # Only pull from the persistent storage if the CACHE_BACKUP_INTERVAL is greater than 0 - if settings.CACHE_BACKUP_INTERVAL > 0: - missing_data = {} - for data_id in missing_ids: - try: - with self.open(data_id) as f: - missing_data[data_id] = json.load(f) - self.set(data_id, missing_data[data_id]) - except: - missing_data[data_id] = default - data.update(missing_data) - else: - for data_id in missing_ids: - data[data_id] = default - return data + return {data_id:self.get(data_id, default) for data_id in data_ids} def set(self, data_id:str, data:dict, memory:bool=True, persistent:bool=False, timeout:[int|None]=settings.CACHE_TIMEOUT): """ @@ -209,6 +104,27 @@ def set_many(self, data:dict, memory:bool=True, persistent:bool=False, timeout:[ for data_id, data in data.items(): self.set(data_id, data, memory=memory, persistent=persistent, timeout=timeout) + def persist(self, data_id:str): + """ + Persists the data in the cache to the persistent storage + + data_id: str + The data_id of the data to be persisted + """ + data = self.cache.get(data_id, "__NONE__") + if data != "__NONE__": + self.set(data_id, data, memory=False, persistent=True) + + def persist_many(self, data_ids:list): + """ + Persists the data in the cache to the persistent storage + + data_ids: list + The data_ids of the data to be persisted + """ + for data_id in data_ids: + self.persist(data_id) + def delete(self, data_id:str, memory:bool=False, persistent:bool=False): """ Deletes the data in one or both of the cache and the persistent storage @@ -223,7 +139,7 @@ def delete(self, data_id:str, memory:bool=False, persistent:bool=False): Default: False """ # print(f'Cache -> Deleting: {data_id}') - assert memory or persistent, "Either memory or persistent must be True" + assert memory or persistent, "Cache.delete(): `memory` or `persistent` must be True" if memory: self.cache.delete(data_id) if persistent: @@ -246,33 +162,26 @@ def delete_many(self, data_ids:list, memory:bool=False, persistent:bool=False): Default: False """ # print(f'Cache -> Deleting: {data_ids}') - assert memory or persistent, "Cache.delete_many(): Either memory or persistent must be True" + assert memory or persistent, "Cache.delete_many(): `memory` or `persistent` must be True" # Note: This uses a loop instead of self.cache.delete_many() because the latter # is not always supported by cache backends (esp Serverless Caches) for data_id in data_ids: self.delete(data_id, memory=memory, persistent=persistent) - def delete_pattern(self, pattern:str, memory:bool=True, persistent:bool=True): + def flush(self, memory:bool=False, persistent:bool=False): """ - Deletes the data in one or both of the cache and the persistent storage based on a pattern + Flushes the cache and/or the persistent storage - Note: This requires the cache server to support the keys() function which is not always the case - (esp for Serverless Caches) - - pattern: str - The pattern to use to find the data to be deleted - Note: The pattern is a simple string pattern to indicate key starts with - Note: A '*' can be used as a wildcard at the end of the pattern but not in the middle memory: bool - Whether to delete the data from the cache - Default: True + Whether to flush the cache + Default: False persistent: bool - Whether to delete the data from the persistent storage - Default: True + Whether to flush the persistent storage + Default: False """ - # print(f'Cache -> Deleting: {pattern}') - assert memory or persistent, "Cache.delete_pattern(): Either memory or persistent must be True" + # print(f'Cache -> Flushing') if memory: - self.delete_many(self.keys(pattern, memory=True), memory=True) + self.cache.clear() if persistent: - self.delete_many(self.keys(pattern, persistent=True), persistent=True) \ No newline at end of file + files = self.listdir("")[1] + self.delete_many(files, persistent=True) \ No newline at end of file diff --git a/cave_core/utils/session_persistence.py b/cave_core/utils/session_persistence.py new file mode 100644 index 0000000..655dcf4 --- /dev/null +++ b/cave_core/utils/session_persistence.py @@ -0,0 +1,44 @@ +from django.conf import settings +import os, time +from pamda import pamda + +@pamda.thunkify +def __session_persistence_service_task__(Sessions, cache): + """ + Persists the data in the cache to the persistent storage + + Sessions: Sessions + The Sessions object to be used for the cache + + Notes: + - This function is designed to be run in a separate thread + """ + # Run the persistence background tasks + # Checking if RUN_MAIN is true ensures that the background tasks are only run once on initial server start + print('Starting the cache persistence background service...') + while True: + try: + # Interruptable sleep + for i in range(settings.CACHE_BACKUP_INTERVAL): + time.sleep(1) + # Load the meta data + # This is used to prevent multiple backups from happening at the same time + meta = cache.get('meta') + if meta is None: + meta = {'last_update':0} + now = time.time() + # Assume multiple servers are running - only run this if the last update was long enough ago + if meta['last_update']+settings.CACHE_BACKUP_INTERVAL Date: Wed, 13 Nov 2024 15:21:09 -0500 Subject: [PATCH 09/25] Fix bug in config assertions. --- cave_app/settings/development.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/cave_app/settings/development.py b/cave_app/settings/development.py index cadde71..8f97265 100755 --- a/cave_app/settings/development.py +++ b/cave_app/settings/development.py @@ -175,7 +175,9 @@ CACHE_TIMEOUT = config("CACHE_TIMEOUT", default=0, cast=int) assert CACHE_TIMEOUT >= 0, "CACHE_TIMEOUT must be greater than or equal to 0" assert CACHE_BACKUP_INTERVAL >= 0, "CACHE_BACKUP_INTERVAL must be greater than or equal to 0" -assert CACHE_TIMEOUT >= CACHE_BACKUP_INTERVAL*2, "CACHE_TIMEOUT must be at least twice as long as CACHE_BACKUP_INTERVAL" +if CACHE_TIMEOUT > 0: + assert CACHE_TIMEOUT >= CACHE_BACKUP_INTERVAL*2, "CACHE_TIMEOUT must be at least twice as long as CACHE_BACKUP_INTERVAL" + assert CACHE_BACKUP_INTERVAL > 0, "CACHE_BACKUP_INTERVAL must be greater than 0 if CACHE_TIMEOUT is greater than 0" assert CACHE_TIMEOUT == 0 or CACHE_BACKUP_INTERVAL > 0, "CACHE_BACKUP_INTERVAL must be greater than 0 if CACHE_TIMEOUT is greater than 0" CACHE_TIMEOUT = None if CACHE_TIMEOUT == 0 else CACHE_TIMEOUT CACHE_BACKUP_INTERVAL = None if CACHE_BACKUP_INTERVAL == 0 else CACHE_BACKUP_INTERVAL From 87591f2d2fcd36c71ba32b665371d6e5718a7488 Mon Sep 17 00:00:00 2001 From: Connor Makowski Date: Wed, 13 Nov 2024 15:28:35 -0500 Subject: [PATCH 10/25] Remove unnecessary setting --- cave_app/settings/development.py | 1 - 1 file changed, 1 deletion(-) diff --git a/cave_app/settings/development.py b/cave_app/settings/development.py index 8f97265..7a0401a 100755 --- a/cave_app/settings/development.py +++ b/cave_app/settings/development.py @@ -178,7 +178,6 @@ if CACHE_TIMEOUT > 0: assert CACHE_TIMEOUT >= CACHE_BACKUP_INTERVAL*2, "CACHE_TIMEOUT must be at least twice as long as CACHE_BACKUP_INTERVAL" assert CACHE_BACKUP_INTERVAL > 0, "CACHE_BACKUP_INTERVAL must be greater than 0 if CACHE_TIMEOUT is greater than 0" -assert CACHE_TIMEOUT == 0 or CACHE_BACKUP_INTERVAL > 0, "CACHE_BACKUP_INTERVAL must be greater than 0 if CACHE_TIMEOUT is greater than 0" CACHE_TIMEOUT = None if CACHE_TIMEOUT == 0 else CACHE_TIMEOUT CACHE_BACKUP_INTERVAL = None if CACHE_BACKUP_INTERVAL == 0 else CACHE_BACKUP_INTERVAL CACHES = { From a95ae74372ecd9c6ec01f85254048b1008f5b802 Mon Sep 17 00:00:00 2001 From: Connor Makowski Date: Wed, 13 Nov 2024 17:05:01 -0500 Subject: [PATCH 11/25] Migrate to django_sockets 1.2.0 --- cave_core/management/commands/cache_test.py | 3 ++- cave_core/websockets/cave_ws_broadcaster.py | 5 ++--- cave_core/websockets/socket_server.py | 3 +++ requirements.txt | 2 +- 4 files changed, 8 insertions(+), 5 deletions(-) diff --git a/cave_core/management/commands/cache_test.py b/cave_core/management/commands/cache_test.py index 5f67778..e14c954 100644 --- a/cave_core/management/commands/cache_test.py +++ b/cave_core/management/commands/cache_test.py @@ -1,3 +1,4 @@ +from django.conf import settings from django.core.management.base import BaseCommand from cave_core.utils.cache import Cache from django_sockets.sockets import BaseSocketServer @@ -16,7 +17,7 @@ async def send(ws_data): # Test the socket server cache process base_receive = asyncio.Queue() - base_socket_server = BaseSocketServer(scope={}, receive=base_receive.get, send=send) + base_socket_server = BaseSocketServer(scope={}, receive=base_receive.get, send=send, config=settings.DJANGO_SOCKETS_CONFIG) base_socket_server.start_listeners() base_socket_server.subscribe("test_channel") # Small message diff --git a/cave_core/websockets/cave_ws_broadcaster.py b/cave_core/websockets/cave_ws_broadcaster.py index 10e7a5e..45f1127 100755 --- a/cave_core/websockets/cave_ws_broadcaster.py +++ b/cave_core/websockets/cave_ws_broadcaster.py @@ -1,9 +1,8 @@ -# External Imports +from django.conf import settings import type_enforced - from django_sockets.broadcaster import Broadcaster -broadcaster = Broadcaster() +broadcaster = Broadcaster(config=settings.DJANGO_SOCKETS_CONFIG) # Constants acceptable_events = set( diff --git a/cave_core/websockets/socket_server.py b/cave_core/websockets/socket_server.py index a1eeaf7..fc0dfa6 100644 --- a/cave_core/websockets/socket_server.py +++ b/cave_core/websockets/socket_server.py @@ -12,6 +12,9 @@ def __init__(self, user, data): self.user = user class SocketServer(BaseSocketServer): + def get_config(self): + return settings.DJANGO_SOCKETS_CONFIG + def receive(self, data): if settings.DEBUG: print("WS RECEIVE ", data['command']) diff --git a/requirements.txt b/requirements.txt index 17ca8b0..8dacd25 100755 --- a/requirements.txt +++ b/requirements.txt @@ -5,7 +5,7 @@ daphne==4.1.2 Django==5.1.1 django-cors-headers==4.4.0 django-import-export==4.1.1 -django_sockets==1.0.0 +django_sockets==1.2.0 django-solo==2.3.0 djangorestframework==3.15.2 msgpack==1.1.0 From d75d714cc9e52ffea9a6813670b0482fac0c475a Mon Sep 17 00:00:00 2001 From: Connor Makowski Date: Wed, 13 Nov 2024 17:10:38 -0500 Subject: [PATCH 12/25] Prep for merge. --- cave_core/utils/broadcasting.py | 170 -------------------------------- 1 file changed, 170 deletions(-) delete mode 100755 cave_core/utils/broadcasting.py diff --git a/cave_core/utils/broadcasting.py b/cave_core/utils/broadcasting.py deleted file mode 100755 index de3188e..0000000 --- a/cave_core/utils/broadcasting.py +++ /dev/null @@ -1,170 +0,0 @@ -# Framework Imports -from channels.layers import get_channel_layer - -# External Imports -from asgiref.sync import async_to_sync -import json, type_enforced - -channel_layer = get_channel_layer() -sync_send = async_to_sync(channel_layer.group_send) - -acceptable_events = set( - [ - "mutation", - "overwrite", - "message", - "updateSessions", - "updateLoading", - "export", - ] -) - -theme_list = set(["primary", "secondary", "error", "warning", "info", "success"]) - - -class Socket: - def __init__(self, model_object): - self.model_object = model_object - - def format_broadcast_payload(self, event: str, data: dict, **kwargs): - """ - Formats a broadcast payload - - Requires: - - - `event`: - - Type: str - - What: The event to broadcast - - Allowed Values: "mutation", "overwrite", "message", "updateSessions", "updateLoading" - - `data`: - - Type: dict (json serializable) - - What: The data to broadcast - - `**kwargs`: - - Type: dict (json serializable) - - What: Any additional data to serialize into the payload not under its own key in the payload - - Note: This will not be in `data` in the payload - - """ - if event not in acceptable_events: - raise ValueError( - f"Invalid Event ('{event}'). Allowed events include: {acceptable_events}" - ) - if not isinstance(data, dict): - raise TypeError(f"Invalid `data` type ('{type(data)}'). `data` must be a dict.") - return json.dumps({"event": event, "data": data, **kwargs}) - - def broadcast(self, event: str, data: dict, **kwargs): - """ - Broadcasts a message to all users related to an object by object.get_user_ids() - - Requires: - - - `event`: - - Type: str - - What: The event to broadcast - - Allowed Values: "mutation", "overwrite", "message", "updateSessions", "updateLoading" - - Note: If `event` is "overwrite", then a loading broadcast will be sent instead - - `data`: - - Type: dict - - What: The data to broadcast - """ - payload = self.format_broadcast_payload(event=event, data=data, **kwargs) - # Note: broadcast_type refers to the function called in consumer.py - broadcast_type = "loadingbroadcast" if event == "overwrite" else "broadcast" - for user_id in self.model_object.get_user_ids(): - sync_send(str(user_id), {"type": broadcast_type, "payload": payload}) - - @type_enforced.Enforcer - def notify( - self, - message: str, - title: str = "", - show: bool = True, - theme: str = "info", - duration: int = 10, - **kwargs, - ): - """ - Notify end users with a message - - Requires: - - - `message`: - - Type: str - - What: The message to display to the user - - Optional: - - - `title`: - - Type: str - - What: The title of the message - - `show`: - - Type: bool - - What: Whether or not to show the message - - Default: True - - `theme`: - - Type: str - - What: The theme of the message - - Default: "info" - - Allowed Values: "primary", "secondary", "error", "warning", "info", "success" - - `duration`: - - Type: int - - What: The duration in seconds to show the message - - Default: 10 - - `**kwargs`: - - Type: dict (json serializable) - - What: Any additional data to serialize and pass to the user - - Example: - - ``` - from cave_core.utils.broadcasting import Socket - Socket(request.user.session).notify( - message="Hello World!", - title="Hello:", - show=True, - theme="info", - duration=10, - ) - ``` - """ - if theme not in theme_list: - raise ValueError(f"Invalid `theme` ('{theme}'). Allowed `theme`s include: {theme_list}") - self.broadcast( - event="message", - data={ - "snackbarShow": show, - "snackbarType": theme, - "title": title, - "message": message, - "duration": duration, - **kwargs, - }, - loading=False, - ) - - def export( - self, - data, - name="session-data.json", - ): - """ - Send end users a json serializable object which is downloaded by the client to the user's device - - Requires: - - - `data`: - - Type: dict - - What: Json encodable data to send to the user - - Optional: - - - `name`: - - Type: str - - What: The name of the file to download - - Default: "session-data.json" - """ - self.broadcast( - event="export", - data={"data": data, "name": name}, - ) From 06cf27ec08dfa347bd01cd407becb2167df825a7 Mon Sep 17 00:00:00 2001 From: Connor Makowski Date: Wed, 13 Nov 2024 17:17:13 -0500 Subject: [PATCH 13/25] Post merge cleanup. --- cave_core/websockets/cave_ws_broadcaster.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/cave_core/websockets/cave_ws_broadcaster.py b/cave_core/websockets/cave_ws_broadcaster.py index 45f1127..c1215f5 100755 --- a/cave_core/websockets/cave_ws_broadcaster.py +++ b/cave_core/websockets/cave_ws_broadcaster.py @@ -133,11 +133,12 @@ def notify( }, loading=False, ) - + @type_enforced.Enforcer def export( self, - data: dict, + data, + name="session-data.json", ): """ Send end users a json serializable object which is downloaded by the client to the user's device @@ -147,8 +148,15 @@ def export( - `data`: - Type: dict - What: Json encodable data to send to the user + + Optional: + + - `name`: + - Type: str + - What: The name of the file to download + - Default: "session-data.json" """ self.broadcast( event="export", - data=data, + data={"data": data, "name": name}, ) From a47f29e43e3bcdc2b20af6c089ea9dfb7a6d9e01 Mon Sep 17 00:00:00 2001 From: Luis Vasquez Date: Thu, 14 Nov 2024 01:43:04 +0100 Subject: [PATCH 14/25] Add map legend features to API example --- cave_api/cave_api/examples/kitchen_sink.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cave_api/cave_api/examples/kitchen_sink.py b/cave_api/cave_api/examples/kitchen_sink.py index 73fd287..0290e28 100644 --- a/cave_api/cave_api/examples/kitchen_sink.py +++ b/cave_api/cave_api/examples/kitchen_sink.py @@ -904,6 +904,8 @@ def execute_command(session_data, socket, command="init", **kwargs): "longitude": 14, }, }, + "legendView": "full", + "showLegendGroupNames": False, "legendGroups": { "lga": { "name": "Legend Group A", From 45a18366f8a5d4362e9555943663f38e790189a9 Mon Sep 17 00:00:00 2001 From: Connor Makowski Date: Thu, 14 Nov 2024 13:18:03 -0500 Subject: [PATCH 15/25] Migrate to new URL structure and add support for different ws url paths and ws encodings. --- cave_app/settings/development.py | 14 +++--- cave_app/urls.py | 45 ++++++++++--------- cave_core/admin.py | 2 +- cave_core/management/commands/cache_test.py | 2 +- cave_core/utils/wrapping.py | 2 +- cave_core/views/site_util_views.py | 18 ++++---- cave_core/views/site_views.py | 30 ++++++------- cave_core/websockets/app.py | 2 +- cave_core/websockets/cave_ws_broadcaster.py | 2 +- cave_core/websockets/socket_server.py | 7 ++- requirements.txt | 2 +- templates/app.html | 2 + templates/login.html | 4 +- templates/modules/base/authenticated_nav.html | 10 ++--- .../modules/base/authenticated_nav_pages.html | 6 +-- templates/modules/base/nav_bar.html | 4 +- .../registration/password_reset_done.html | 2 +- .../registration/password_reset_form.html | 2 +- templates/root.html | 2 +- 19 files changed, 80 insertions(+), 78 deletions(-) diff --git a/cave_app/settings/development.py b/cave_app/settings/development.py index 7a0401a..7b04200 100755 --- a/cave_app/settings/development.py +++ b/cave_app/settings/development.py @@ -57,7 +57,7 @@ AUTH_USER_MODEL = "cave_core.CustomUser" ## Login/Logout redirection LOGIN_REDIRECT_URL = "app/" -LOGOUT_REDIRECT_URL = "/auth/login/" +LOGOUT_REDIRECT_URL = "/cave/auth/login/" # Django admin authentication information DJANGO_ADMIN_FIRST_NAME = config("DJANGO_ADMIN_FIRST_NAME", default="") DJANGO_ADMIN_LAST_NAME = config("DJANGO_ADMIN_LAST_NAME", default="") @@ -129,8 +129,8 @@ STATIC_ROOT = os.path.join(BASE_DIR, "staticfiles") STATICFILES_DIRS = [os.path.join(BASE_DIR, "static")] MEDIA_ROOT = os.path.join(BASE_DIR, "media") -MEDIA_URL = "/media/" -STATIC_URL = "/static/" +MEDIA_URL = "/cave/media/" +STATIC_URL = "/cave/static/" STATICFILES_STORAGE = "cave_app.storage_backends.StaticStorage" ################################################################ @@ -158,14 +158,10 @@ ################################################################ -# DJANGO_SOCKETS_CONFIG +# DJANGO_SOCKETS ################################################################ INSTALLED_APPS = ["daphne"] + INSTALLED_APPS -DJANGO_SOCKETS_CONFIG = { - "hosts": [ - {"address": f"redis://{os.environ.get('REDIS_HOST')}:{os.environ.get('REDIS_PORT')}"} - ], -} +DJANGO_SOCKET_HOSTS = [{"address": f"redis://{os.environ.get('REDIS_HOST')}:{os.environ.get('REDIS_PORT')}"}] ################################################################ diff --git a/cave_app/urls.py b/cave_app/urls.py index 2a36907..e2485ab 100755 --- a/cave_app/urls.py +++ b/cave_app/urls.py @@ -11,49 +11,50 @@ urlpatterns = [ # Main Pages - path("", site_views.root_view), - path("app/info/", site_views.info), - path("app/page/", site_views.page), - path("app/people/", site_views.people), - path("app/workspace/", site_views.workspace), - path("app/profile/", site_views.profile), + path("cave/", site_views.root_view), + path("cave/info/", site_views.info), + path("cave/page/", site_views.page), + path("cave/people/", site_views.people), + path("cave/workspace/", site_views.workspace), + path("cave/profile/", site_views.profile), # Util Pages - path("app/", site_util_views.app_router), + path("cave/router/", site_util_views.app_router), # General API Pages - path("app/health/", api_util_views.health), - path("app/custom_pages/", api_util_views.custom_pages), + path("cave/health/", api_util_views.health), + path("cave/custom_pages/", api_util_views.custom_pages), # User Authentication - path("auth/login/", site_util_views.login_view), - path("auth/signup/", site_util_views.signup), - path("auth/logout/", site_util_views.user_logout), - path("auth/validate_email/", site_util_views.validate_email), - path("auth/send_email_validation_code/", api_util_views.send_email_validation_code), - path("auth/change_password/", site_util_views.change_password), + path("cave/auth/login/", site_util_views.login_view), + path("cave/auth/signup/", site_util_views.signup), + path("cave/auth/logout/", site_util_views.user_logout), + path("cave/auth/validate_email/", site_util_views.validate_email), + path("cave/auth/send_email_validation_code/", api_util_views.send_email_validation_code), + path("cave/auth/change_password/", site_util_views.change_password), # Password Reset (auth_views uses names for url navs) path( - "auth/password_reset/", + "cave/auth/password_reset/", auth_views.PasswordResetView.as_view(extra_context=url_helpers.get_extra_content()), name="password_reset", ), path( - "auth/password_reset_done/", + "cave/auth/password_reset_done/", auth_views.PasswordResetDoneView.as_view(extra_context=url_helpers.get_extra_content()), name="password_reset_done", ), path( - "auth///", + "cave/auth///", auth_views.PasswordResetConfirmView.as_view(extra_context=url_helpers.get_extra_content()), name="password_reset_confirm", ), path( - "auth/password_reset_complete/", - RedirectView.as_view(url="/auth/login/", permanent=False), + "cave/auth/password_reset_complete/", + RedirectView.as_view(url="/cave/auth/login/", permanent=False), name="password_reset_complete", ), # Admin site - path("app/admin/", admin.site.urls), - path("app/staff/", staff_site.urls), + path("cave/admin/", admin.site.urls), + path("cave/staff/", staff_site.urls), ] if settings.DEBUG: urlpatterns += static(settings.MEDIA_URL, document_root=settings.MEDIA_ROOT) + urlpatterns += [path("", RedirectView.as_view(url="/cave/", permanent=False))] diff --git a/cave_core/admin.py b/cave_core/admin.py index c27f32b..77a1cf0 100755 --- a/cave_core/admin.py +++ b/cave_core/admin.py @@ -5,7 +5,7 @@ from solo.admin import SingletonModelAdmin # Admin site attributes -admin.site.site_url = "/app/" +admin.site.site_url = "/cave/" admin.site.site_title = "CAVE App Admin Site" admin.site.site_header = "Admin" admin.site.index_title = "CAVE App" diff --git a/cave_core/management/commands/cache_test.py b/cave_core/management/commands/cache_test.py index e14c954..84888b5 100644 --- a/cave_core/management/commands/cache_test.py +++ b/cave_core/management/commands/cache_test.py @@ -17,7 +17,7 @@ async def send(ws_data): # Test the socket server cache process base_receive = asyncio.Queue() - base_socket_server = BaseSocketServer(scope={}, receive=base_receive.get, send=send, config=settings.DJANGO_SOCKETS_CONFIG) + base_socket_server = BaseSocketServer(scope={}, receive=base_receive.get, send=send, hosts=settings.DJANGO_SOCKET_HOSTS) base_socket_server.start_listeners() base_socket_server.subscribe("test_channel") # Small message diff --git a/cave_core/utils/wrapping.py b/cave_core/utils/wrapping.py index 282e006..9c7eb37 100755 --- a/cave_core/utils/wrapping.py +++ b/cave_core/utils/wrapping.py @@ -31,7 +31,7 @@ def redirect_logged_in_user(fn): @wraps(fn) def wrap(request): if request.user.is_authenticated: - return redirect("/app/") + return redirect("/cave/router/") return fn(request) return wrap diff --git a/cave_core/views/site_util_views.py b/cave_core/views/site_util_views.py index 2274a66..4e75b4f 100644 --- a/cave_core/views/site_util_views.py +++ b/cave_core/views/site_util_views.py @@ -13,7 +13,7 @@ # Views -@login_required(login_url="/auth/login/") +@login_required(login_url="/cave/auth/login/") def app_router(request): """ App Router View @@ -22,11 +22,11 @@ def app_router(request): """ globals = models.Globals.get_solo() if globals.show_app_page and request.user.get_access_dict().get("status") == 'accepted': - return redirect("/app/workspace/") - return redirect("/app/info/") + return redirect("/cave/workspace/") + return redirect("/cave/info/") -@login_required(login_url="/auth/login/") +@login_required(login_url="/cave/auth/login/") def change_password(request): """ Change password view @@ -38,7 +38,7 @@ def change_password(request): if form.is_valid(): user = form.save() update_session_auth_hash(request, user) # Important! - return redirect("/app/") + return redirect("/cave/router/") else: form = PasswordChangeForm(request.user) return render( @@ -62,7 +62,7 @@ def signup(request): """ globals = models.Globals.get_solo() if not globals.allow_anyone_create_user: - return redirect("/auth/login/") + return redirect("/cave/auth/login/") if request.method == "POST": form = forms.CreateUserForm(request.POST) if form.is_valid(): @@ -71,7 +71,7 @@ def signup(request): raw_password = form.cleaned_data.get("password1") user = authenticate(username=username, password=raw_password) login(request, user) - return redirect("/app/") + return redirect("/cave/router/") else: form = forms.CreateUserForm() return render( @@ -103,7 +103,7 @@ def login_view(request): login(request, user) next_url = request.POST.get("next_url") if next_url == "None" or len(next_url) == 0: - next_url = "/app/" + next_url = "/cave/router/" # Ensure next url ends with a trailing slash if next_url[-1] != "/": next_url += "/" @@ -145,7 +145,7 @@ def validate_email(request): user.email_validated = True user.email_validation_code = None user.save() - return redirect("/app/") + return redirect("/cave/info/") globals = models.Globals.get_solo() return render( request, diff --git a/cave_core/views/site_views.py b/cave_core/views/site_views.py index 6d17d66..2482228 100755 --- a/cave_core/views/site_views.py +++ b/cave_core/views/site_views.py @@ -30,7 +30,7 @@ def root_view(request): ) -@login_required(login_url="/auth/login/") +@login_required(login_url="/cave/auth/login/") def info(request): """ Info View @@ -57,7 +57,7 @@ def info(request): ) -@login_required(login_url="/auth/login/") +@login_required(login_url="/cave/auth/login/") def page(request): """ Generic page view @@ -69,9 +69,9 @@ def page(request): globals = models.Globals.get_solo() page = models.Pages.objects.filter(show=True, url_name=request.GET.get("page")).first() if page == None: - return redirect("/app/") + return redirect("/cave/info/") if (not request.user.has_access()) and page.require_access: - return redirect("/app/") + return redirect("/cave/info/") return render( request, "generic.html", @@ -85,10 +85,10 @@ def page(request): }, ) else: - return redirect("/app/") + return redirect("/cave/info/") -@login_required(login_url="/auth/login/") +@login_required(login_url="/cave/auth/login/") def people(request): """ People view @@ -98,7 +98,7 @@ def people(request): # print("\n\nPeople\n") globals = models.Globals.get_solo() if not request.user.has_access() or not globals.show_people_page: - return redirect("/app/") + return redirect("/cave/info/") if request.method == "GET": return render( request, @@ -111,10 +111,10 @@ def people(request): }, ) else: - return redirect("/app/") + return redirect("/cave/info/") -@login_required(login_url="/auth/login/") +@login_required(login_url="/cave/auth/login/") def workspace(request): """ Workspace view @@ -124,9 +124,9 @@ def workspace(request): # print("\n\nApp\n") globals = models.Globals.get_solo() if not request.user.has_access(): - return redirect("/app/") + return redirect("/cave/info/") if not globals.show_app_page: - return redirect("/app/") + return redirect("/cave/info/") if request.method == "GET": appResponse = render( request, @@ -144,10 +144,10 @@ def workspace(request): appResponse["Cross-Origin-Opener-Policy"] = "same-origin" return appResponse else: - return redirect("/app/") + return redirect("/cave/info/") -@login_required(login_url="/auth/login/") +@login_required(login_url="/cave/auth/login/") def profile(request): """ User profile view @@ -156,13 +156,13 @@ def profile(request): """ globals = models.Globals.get_solo() if not globals.allow_user_edit_info: - return redirect("/app/") + return redirect("/cave/info/") UpdateUserForm = forms.UpdateUserForm(globals) if request.method == "POST": form = UpdateUserForm(request.POST, request.FILES, instance=request.user) if form.is_valid(): user = form.save() - return redirect("/app/profile/") + return redirect("/cave/profile/") else: form = UpdateUserForm(instance=request.user) return render( diff --git a/cave_core/websockets/app.py b/cave_core/websockets/app.py index 9477737..55ecbea 100644 --- a/cave_core/websockets/app.py +++ b/cave_core/websockets/app.py @@ -4,7 +4,7 @@ from .socket_server import SocketServer websocket_urlpatterns = [ - path("ws/", SocketServer.as_asgi), + path("cave/ws/", SocketServer.as_asgi), ] def get_ws_asgi_application(): diff --git a/cave_core/websockets/cave_ws_broadcaster.py b/cave_core/websockets/cave_ws_broadcaster.py index c1215f5..902fbdc 100755 --- a/cave_core/websockets/cave_ws_broadcaster.py +++ b/cave_core/websockets/cave_ws_broadcaster.py @@ -2,7 +2,7 @@ import type_enforced from django_sockets.broadcaster import Broadcaster -broadcaster = Broadcaster(config=settings.DJANGO_SOCKETS_CONFIG) +broadcaster = Broadcaster(hosts=settings.DJANGO_SOCKET_HOSTS) # Constants acceptable_events = set( diff --git a/cave_core/websockets/socket_server.py b/cave_core/websockets/socket_server.py index fc0dfa6..b910d96 100644 --- a/cave_core/websockets/socket_server.py +++ b/cave_core/websockets/socket_server.py @@ -2,6 +2,7 @@ from .commands import get_command from django_sockets.sockets import BaseSocketServer +import msgpack class Request: """ @@ -12,8 +13,10 @@ def __init__(self, user, data): self.user = user class SocketServer(BaseSocketServer): - def get_config(self): - return settings.DJANGO_SOCKETS_CONFIG + def configure(self): + self.hosts = settings.DJANGO_SOCKET_HOSTS + self.ws_encoder = msgpack.packb + self.ws_encoder_is_bytes = True def receive(self, data): if settings.DEBUG: diff --git a/requirements.txt b/requirements.txt index 8dacd25..8269b58 100755 --- a/requirements.txt +++ b/requirements.txt @@ -5,7 +5,7 @@ daphne==4.1.2 Django==5.1.1 django-cors-headers==4.4.0 django-import-export==4.1.1 -django_sockets==1.2.0 +django_sockets==2.0.0b2 django-solo==2.3.0 djangorestframework==3.15.2 msgpack==1.1.0 diff --git a/templates/app.html b/templates/app.html index fe0ebc7..ed9196a 100755 --- a/templates/app.html +++ b/templates/app.html @@ -48,6 +48,8 @@ data:{ user_token: '{{ user_token }}', mapbox_token: '{{ globals.mapbox_token }}', + ws_path: '/cave/ws/', + ws_encoding: 'msgpack' } }) } diff --git a/templates/login.html b/templates/login.html index 51f223b..88f939a 100755 --- a/templates/login.html +++ b/templates/login.html @@ -13,9 +13,9 @@

Sign In

{% if globals.allow_anyone_create_user %} - + {% endif %} - +
diff --git a/templates/modules/base/authenticated_nav.html b/templates/modules/base/authenticated_nav.html index 110ccdc..3476077 100755 --- a/templates/modules/base/authenticated_nav.html +++ b/templates/modules/base/authenticated_nav.html @@ -5,9 +5,9 @@ {% if access_dict %} @@ -16,21 +16,21 @@ {% if user.is_staff %} {% if user.is_superuser %} {% else %} {% endif %} {% endif %} - diff --git a/templates/modules/base/authenticated_nav_pages.html b/templates/modules/base/authenticated_nav_pages.html index e351cd1..5568206 100755 --- a/templates/modules/base/authenticated_nav_pages.html +++ b/templates/modules/base/authenticated_nav_pages.html @@ -1,14 +1,14 @@
@@ -29,7 +29,7 @@ {% if globals.site_logo %} {{ globals.site_name }} {% else %} - {{ globals.site_name }} + {{ globals.site_name }} {% endif %}
diff --git a/templates/registration/password_reset_done.html b/templates/registration/password_reset_done.html index cf07b6f..18a22ce 100755 --- a/templates/registration/password_reset_done.html +++ b/templates/registration/password_reset_done.html @@ -12,7 +12,7 @@

Profile Recovery

- +
diff --git a/templates/registration/password_reset_form.html b/templates/registration/password_reset_form.html index 189740a..ddac5fd 100755 --- a/templates/registration/password_reset_form.html +++ b/templates/registration/password_reset_form.html @@ -13,7 +13,7 @@

Account Recovery

- +
diff --git a/templates/root.html b/templates/root.html index 48b7e22..33fbda6 100755 --- a/templates/root.html +++ b/templates/root.html @@ -42,7 +42,7 @@

Click To Get Started!

window.addEventListener('resize', setAppDivWidth); window.addEventListener('load', setAppDivWidth); document.querySelector('.bg-site-background').addEventListener('click', () => { - window.location.href = '/auth/login'; + window.location.href = '/cave/auth/login'; }); {% endblock %} From 8053cb1ec75b90335dffe8c57e7d210cd45925eb Mon Sep 17 00:00:00 2001 From: Connor Makowski Date: Thu, 14 Nov 2024 13:32:27 -0500 Subject: [PATCH 16/25] Omit background api keys when calling execute command. --- cave_core/models.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/cave_core/models.py b/cave_core/models.py index ee7266b..668a286 100755 --- a/cave_core/models.py +++ b/cave_core/models.py @@ -1035,7 +1035,7 @@ def set_versions(self, versions:dict) -> None: cache.set(f"session:{self.id}:versions", versions) self.__dict__['versions'] = versions - def get_data(self, keys:list[str]=None, client_only:bool=True) -> dict: + def get_data(self, keys:list[str]=None, client_only:bool=True, omit_keys=list()) -> dict: """ Returns all data for this session @@ -1050,6 +1050,11 @@ def get_data(self, keys:list[str]=None, client_only:bool=True) -> dict: - Type: bool - What: If True, only relevant client keys are returned - Default: True + - `omit_keys`: + - Type: list of strings + - What: The keys to omit from the data + - Default: [] + - Note: If None, no keys are omitted Returns: - Type: dict @@ -1059,6 +1064,8 @@ def get_data(self, keys:list[str]=None, client_only:bool=True) -> dict: keys = list(self.get_versions().keys()) if client_only: keys = pamda.intersection(keys, api_keys) + if len(omit_keys) > 0: + keys = pamda.difference(keys, omit_keys) keys_to_get_from_cache = [] for key in keys: # Avoid additional cache hits by checking if the data is already in the session __dict__ @@ -1207,7 +1214,7 @@ def execute_api_command( """ # print('\n==EXECUTE API COMMAND==') self.set_loading(True) - session_data = self.get_data(keys=command_keys, client_only=False) + session_data = self.get_data(keys=command_keys, client_only=False, omit_keys=background_api_keys) socket = CaveWSBroadcaster(self) command_output = execute_command( session_data=session_data, command=command, socket=socket, mutate_dict=mutate_dict From a8a46df434cd27fdccb74ac49450f1bfbeaf0f86 Mon Sep 17 00:00:00 2001 From: Connor Makowski Date: Thu, 14 Nov 2024 15:15:52 -0500 Subject: [PATCH 17/25] Fix broken URLs --- cave_app/settings/development.py | 1 - templates/modules/utils/custom_page_script.html | 4 ++-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/cave_app/settings/development.py b/cave_app/settings/development.py index 7b04200..fb52097 100755 --- a/cave_app/settings/development.py +++ b/cave_app/settings/development.py @@ -56,7 +56,6 @@ ## Custom Users Model AUTH_USER_MODEL = "cave_core.CustomUser" ## Login/Logout redirection -LOGIN_REDIRECT_URL = "app/" LOGOUT_REDIRECT_URL = "/cave/auth/login/" # Django admin authentication information DJANGO_ADMIN_FIRST_NAME = config("DJANGO_ADMIN_FIRST_NAME", default="") diff --git a/templates/modules/utils/custom_page_script.html b/templates/modules/utils/custom_page_script.html index 1d69ad0..b37625d 100755 --- a/templates/modules/utils/custom_page_script.html +++ b/templates/modules/utils/custom_page_script.html @@ -1,7 +1,7 @@