From 0ded3f3606c2ef9e08aafca3c8e2230c85868a3e Mon Sep 17 00:00:00 2001 From: Tristiisch Date: Fri, 27 Sep 2024 00:51:52 +0200 Subject: [PATCH] feat: socket server as a service --- .vscode/launch.json | 16 +++ Dockerfile | 6 +- Makefile | 16 ++- docker-compose.yml | 1 + src/pyramid/api/services/__init__.py | 1 - src/pyramid/api/services/socket_server.py | 12 +- src/pyramid/api/services/tools/register.py | 13 +- src/pyramid/api/tasks/tools/register.py | 1 + src/{cli.py => pyramid/cli/startup.py} | 12 +- src/pyramid/client/client.py | 16 +-- src/pyramid/client/common.py | 9 +- src/pyramid/client/requests/a_request.py | 4 +- src/pyramid/client/requests/health.py | 27 ---- src/pyramid/client/requests/ping.py | 26 ++++ src/pyramid/client/responses/a_response.py | 17 ++- .../client/responses/a_response_header.py | 12 ++ src/pyramid/client/server.py | 98 -------------- src/pyramid/data/functional/main.py | 4 - src/pyramid/data/health.py | 7 - src/pyramid/data/ping.py | 6 + src/pyramid/services/configuration.py | 2 +- src/pyramid/services/logger.py | 4 + src/pyramid/services/socket_server.py | 120 +++++++++++++++--- src/pyramid/tasks/discord.py | 7 +- src/pyramid/tasks/socket_server.py | 17 +++ src/pyramid/tools/custom_queue.py | 2 +- src/startup_cli.py | 4 + src/startup_cli_dev.py | 10 ++ src/{dev.py => startup_dev.py} | 0 29 files changed, 266 insertions(+), 204 deletions(-) rename src/{cli.py => pyramid/cli/startup.py} (87%) delete mode 100644 src/pyramid/client/requests/health.py create mode 100644 src/pyramid/client/requests/ping.py create mode 100644 src/pyramid/client/responses/a_response_header.py delete mode 100644 src/pyramid/client/server.py delete mode 100644 src/pyramid/data/health.py create mode 100644 src/pyramid/data/ping.py create mode 100644 src/pyramid/tasks/socket_server.py create mode 100644 src/startup_cli.py create mode 100644 src/startup_cli_dev.py rename src/{dev.py => startup_dev.py} (100%) diff --git a/.vscode/launch.json b/.vscode/launch.json index 8ad9737..92e4591 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -35,6 +35,22 @@ } ], "justMyCode": false + }, + { + "name": "Python: Remote Attach CLI", + "type": "python", + "request": "attach", + "connect": { + "host": "localhost", + "port": 5679 + }, + "pathMappings": [ + { + "localRoot": "${workspaceFolder}", + "remoteRoot": "/app" + } + ], + "justMyCode": false } ] } diff --git a/Dockerfile b/Dockerfile index bb76f8c..19e0318 100644 --- a/Dockerfile +++ b/Dockerfile @@ -46,7 +46,7 @@ LABEL org.opencontainers.image.source="https://github.com/tristiisch/PyRamid" \ org.opencontainers.image.authors="tristiisch" \ version="$PROJECT_VERSION" -HEALTHCHECK --interval=30s --retries=3 --timeout=30s CMD python ./src/cli.py health +# HEALTHCHECK --interval=30s --retries=3 --timeout=30s CMD python ./src/startup_cli.py health # Expose port for health check EXPOSE 49150 @@ -101,6 +101,8 @@ FROM base AS executable-dev ARG APP_USER ARG APP_GROUP +# HEALTHCHECK --interval=30s --retries=3 --timeout=30s CMD python -Xfrozen_modules=off ./src/startup_cli_dev.py health + COPY --chown=root:$APP_GROUP --chmod=550 --from=builder-dev /opt/venv /opt/venv ENV PATH="/opt/venv/bin:$PATH" @@ -108,7 +110,7 @@ COPY --chown=root:$APP_GROUP --chmod=750 ./src ./src USER $APP_USER -CMD ["python", "-Xfrozen_modules=off", "./src/dev.py"] +CMD ["python", "-Xfrozen_modules=off", "./src/startup_dev.py"] # ============================ Test Image ============================ FROM base AS tests diff --git a/Makefile b/Makefile index fa411bf..2102d7e 100644 --- a/Makefile +++ b/Makefile @@ -5,7 +5,7 @@ DOCKER_COMPOSE_FILE_PREPROD := docker-compose.preprod.yml DOCKER_SERVICE_PREPROD := pyramid_preprod_pyramid DOCKER_CONTEXT_PREPROD := cookie-pulsheberg -.PHONY: logs +# Basics all: up-b logs @@ -45,6 +45,8 @@ logs: exec: @docker compose exec $(COMPOSE_SERVICE) sh +# Other envs + exec-pp: @scripts/docker_service_exec.sh $(DOCKER_SERVICE_PREPROD) $(DOCKER_CONTEXT_PREPROD) @@ -56,6 +58,14 @@ tests: @mkdir -p ./cover && chmod 777 ./cover @docker run --rm --env-file ./.env -v ./cover:/app/cover pyramid:tests +healthcheck: + @docker compose exec $(COMPOSE_SERVICE) sh -c "python ./src/startup_cli.py health" + +healthcheck-dev: + @docker compose exec $(COMPOSE_SERVICE) sh -c "python -Xfrozen_modules=off ./src/startup_cli_dev.py health" + +# Pythons scripts + img-b: @python scripts/environnement.py --build @@ -68,4 +78,6 @@ img-c: clean: @python scripts/environnement.py --clean -.PHONY: build tests \ No newline at end of file +# Other + +.PHONY: build tests logs diff --git a/docker-compose.yml b/docker-compose.yml index fae62fd..ba24af0 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -13,6 +13,7 @@ services: - pyramid_network ports: - 5678:5678 + - 5679:5679 env_file: .env networks: diff --git a/src/pyramid/api/services/__init__.py b/src/pyramid/api/services/__init__.py index 84fbfed..fc1e4f4 100644 --- a/src/pyramid/api/services/__init__.py +++ b/src/pyramid/api/services/__init__.py @@ -2,5 +2,4 @@ from .discord import IDiscordService from .information import IInformationService from .logger import ILoggerService -from .socket_server import ISocketServerService # from .source_service import ISourceService diff --git a/src/pyramid/api/services/socket_server.py b/src/pyramid/api/services/socket_server.py index eb35498..06961ac 100644 --- a/src/pyramid/api/services/socket_server.py +++ b/src/pyramid/api/services/socket_server.py @@ -1,8 +1,12 @@ -from abc import ABC, abstractmethod -from pyramid.data.functional.application_info import ApplicationInfo +from abc import abstractmethod -class ISocketServerService(ABC): + +class ISocketServerService: + + @abstractmethod + async def open(self): + pass @abstractmethod - def start(self): + def close(self): pass diff --git a/src/pyramid/api/services/tools/register.py b/src/pyramid/api/services/tools/register.py index 3932d65..dbae533 100644 --- a/src/pyramid/api/services/tools/register.py +++ b/src/pyramid/api/services/tools/register.py @@ -24,16 +24,17 @@ def import_services(cls): importlib.import_module(full_module_name) @classmethod - def register_service(cls, name: str, type: type[object]): + def register_service(cls, interface_name: str, type: type[object]): + type_name = type.__name__ if not issubclass(type, ServiceInjector): - raise TypeError("Service %s is not a subclass of ServiceInjector and cannot be initialized." % name) - if name in cls.__SERVICES_REGISTRED: - already_class_name = cls.__SERVICES_REGISTRED[name].__name__ + raise TypeError("Service %s is not a subclass of ServiceInjector and cannot be initialized." % type_name) + if interface_name in cls.__SERVICES_REGISTRED: + already_class_name = cls.__SERVICES_REGISTRED[interface_name].__name__ raise ServiceAlreadyRegisterException( "Cannot register %s with %s, it is already registered with the class %s." - % (name, type.__name__, already_class_name) + % (interface_name, type_name, already_class_name) ) - cls.__SERVICES_REGISTRED[name] = type + cls.__SERVICES_REGISTRED[interface_name] = type @classmethod def create_services(cls): diff --git a/src/pyramid/api/tasks/tools/register.py b/src/pyramid/api/tasks/tools/register.py index 44d6bd3..aaa1e95 100644 --- a/src/pyramid/api/tasks/tools/register.py +++ b/src/pyramid/api/tasks/tools/register.py @@ -82,3 +82,4 @@ def running(loop: asyncio.AbstractEventLoop): parameters.thread.join() signal.signal(signal.SIGTERM, previous_handler) + logging.info("All tasks are stopped") diff --git a/src/cli.py b/src/pyramid/cli/startup.py similarity index 87% rename from src/cli.py rename to src/pyramid/cli/startup.py index 1330ce7..b2d83a2 100644 --- a/src/cli.py +++ b/src/pyramid/cli/startup.py @@ -1,9 +1,10 @@ import argparse +import sys from pyramid.api.services.information import IInformationService from pyramid.api.services.tools.tester import ServiceStandalone from pyramid.client.client import SocketClient -from pyramid.client.requests.health import HealthRequest +from pyramid.client.requests.ping import PingRequest def startup_cli(): ServiceStandalone.import_services() @@ -29,11 +30,10 @@ def startup_cli(): elif args.health: sc = SocketClient(args.host, args.port) - health = HealthRequest() - sc.send(health) + health = PingRequest() + result = sc.send(health) + if result is not True: + sys.exit(1) else: parser.print_help() - -if __name__ == "__main__": - startup_cli() diff --git a/src/pyramid/client/client.py b/src/pyramid/client/client.py index 32be670..81e0c48 100644 --- a/src/pyramid/client/client.py +++ b/src/pyramid/client/client.py @@ -27,7 +27,7 @@ def send(self, req: ARequest) -> bool: try: # Convert the request data to JSON - json_request = self.__common.serialize(req.ask) + json_request = SocketCommon.serialize(req.ask) # Connect to the server self.__logger.info("Connect to %s:%d", self.__host, self.__port) @@ -35,17 +35,16 @@ def send(self, req: ARequest) -> bool: # Send the JSON request to the server self.__logger.debug("Send '%s'", json_request) - self.__common.send_chunk(client_socket, json_request) + SocketCommon.send_chunk(client_socket, json_request) # Receive the response from the server response_str = self.__common.receive_chunk(client_socket) if not response_str: self.__logger.warning("Received empty request") return False - self.__logger.debug("Received '%s'", response_str) - self.receive(req, response_str) - return True + result = self.receive(req, response_str) + return result except OverflowError: self.__logger.warning( @@ -67,8 +66,8 @@ def send(self, req: ARequest) -> bool: client_socket.close() return False - def receive(self, action: ARequest, response_str: str): - response: SocketResponse = SocketResponse.from_json(self.__common.deserialize(response_str)) + def receive(self, action: ARequest, response_str: str) -> bool: + response: SocketResponse = SocketResponse.from_str(response_str) if not response.header: raise ValueError("No header received") @@ -76,4 +75,5 @@ def receive(self, action: ARequest, response_str: str): raise ValueError("No data received") response_data = action.load_data(**(self.__common.deserialize(response.data))) - action.client_receive(response.header, response_data) + result = action.client_receive(response.header, response_data) + return result diff --git a/src/pyramid/client/common.py b/src/pyramid/client/common.py index ddf3e2e..72ce430 100644 --- a/src/pyramid/client/common.py +++ b/src/pyramid/client/common.py @@ -28,14 +28,16 @@ def receive_chunk(self, client_socket: sock): return None return received_data.decode("utf-8") - def send_chunk(self, client_socket: sock, response: str): + @classmethod + def send_chunk(cls, client_socket: sock, response: str): # for chunk in [ # response[i : i + self.buffer_size] for i in range(0, len(response), self.buffer_size) # ]: # client_socket.send(chunk.encode("utf-8")) client_socket.send(response.encode("utf-8")) - def serialize(self, obj): + @classmethod + def serialize(cls, obj): def default(obj): if hasattr(obj, "__dict__"): # if isinstance(obj, SocketResponse): @@ -46,7 +48,8 @@ def default(obj): return json.dumps(obj, default=default) - def deserialize(self, obj: str, object_hook=None): + @classmethod + def deserialize(cls, obj: str, object_hook=None): return json.loads(obj, object_hook=object_hook) diff --git a/src/pyramid/client/requests/a_request.py b/src/pyramid/client/requests/a_request.py index aa393bf..6ad9b1b 100644 --- a/src/pyramid/client/requests/a_request.py +++ b/src/pyramid/client/requests/a_request.py @@ -2,9 +2,9 @@ from typing import Any # from pyramid.client.common import SocketHeader -from pyramid.client.responses.a_response import ResponseHeader from pyramid.client.a_socket import ASocket from pyramid.client.requests.ask_request import AskRequest +from pyramid.client.responses.a_response_header import ResponseHeader class ARequest(ASocket): @@ -17,5 +17,5 @@ def load_data(self, data) -> Any: pass @abstractmethod - def client_receive(self, header: ResponseHeader, data: Any): + def client_receive(self, header: ResponseHeader, data: Any) -> bool: pass diff --git a/src/pyramid/client/requests/health.py b/src/pyramid/client/requests/health.py deleted file mode 100644 index ba82f1d..0000000 --- a/src/pyramid/client/requests/health.py +++ /dev/null @@ -1,27 +0,0 @@ -import json -import logging -import sys -from pyramid.client.requests.a_request import ARequest -from pyramid.client.requests.ask_request import AskRequest -from pyramid.client.responses.a_response import ResponseHeader -from pyramid.data.health import HealthModules - - -class HealthRequest(ARequest): - def __init__(self) -> None: - super().__init__(AskRequest("health")) - - def load_data(self, **data) -> HealthModules: - return HealthModules(**data) - - def client_receive(self, header: ResponseHeader, data: HealthModules): - data_json = json.dumps(data.__dict__, indent=4) - - if not data.is_ok(): - logging.warn("Health check failed") - print(data_json) - sys.exit(1) - else: - logging.info("Health check valid") - print(data_json) - sys.exit(0) diff --git a/src/pyramid/client/requests/ping.py b/src/pyramid/client/requests/ping.py new file mode 100644 index 0000000..0c47434 --- /dev/null +++ b/src/pyramid/client/requests/ping.py @@ -0,0 +1,26 @@ +import json +import logging +import sys +from pyramid.client.requests.a_request import ARequest +from pyramid.client.requests.ask_request import AskRequest +from pyramid.client.responses.a_response_header import ResponseHeader +from pyramid.data.ping import PingSocket + + +class PingRequest(ARequest): + def __init__(self) -> None: + super().__init__(AskRequest("health")) + + def load_data(self, **data) -> PingSocket: + return PingSocket(**data) + + def client_receive(self, header: ResponseHeader, data: PingSocket) -> bool: + data_json = json.dumps(data.__dict__, indent=4) + + if not data.is_ok(): + logging.warning("Health check failed") + print(data_json) + return False + logging.info("Health check valid") + print(data_json) + return True diff --git a/src/pyramid/client/responses/a_response.py b/src/pyramid/client/responses/a_response.py index 9bf40f7..48ee1e0 100644 --- a/src/pyramid/client/responses/a_response.py +++ b/src/pyramid/client/responses/a_response.py @@ -1,17 +1,10 @@ from typing import Any, Self from pyramid.client.a_socket import ASocket -from pyramid.client.common import ResponseCode +from pyramid.client.common import ResponseCode, SocketCommon +from pyramid.client.responses.a_response_header import ResponseHeader # from pyramid.client.common import ResponseCode, SocketHeader -# class ReponseHeader(SocketHeader): -class ResponseHeader: - def __init__(self, code: ResponseCode, message: str | None) -> None: - # super().__init__(self.__class__) - self.code = code - self.message = message - - class SocketResponse(ASocket): def __init__( self, @@ -45,6 +38,12 @@ def to_json(self, serializer): "error_data": serializer(self.error_data) if self.error_data else None, } + @classmethod + def from_str(cls, data: str) -> Self: + json = SocketCommon.deserialize(data) + self = cls.from_json(json) + return self + @classmethod def from_json(cls, json_dict: dict) -> Self: self: Self = cls() diff --git a/src/pyramid/client/responses/a_response_header.py b/src/pyramid/client/responses/a_response_header.py new file mode 100644 index 0000000..9c6910d --- /dev/null +++ b/src/pyramid/client/responses/a_response_header.py @@ -0,0 +1,12 @@ +from typing import Any, Self +from pyramid.client.a_socket import ASocket +from pyramid.client.common import ResponseCode, SocketCommon +# from pyramid.client.common import ResponseCode, SocketHeader + + +# class ReponseHeader(SocketHeader): +class ResponseHeader: + def __init__(self, code: ResponseCode, message: str | None) -> None: + # super().__init__(self.__class__) + self.code = code + self.message = message diff --git a/src/pyramid/client/server.py b/src/pyramid/client/server.py deleted file mode 100644 index 7b73eca..0000000 --- a/src/pyramid/client/server.py +++ /dev/null @@ -1,98 +0,0 @@ -import logging -import socket -from logging import Logger -from socket import socket as sock -from typing import Any - -from pyramid.client.common import ResponseCode, SocketCommon -from pyramid.client.requests.ask_request import AskRequest -from pyramid.client.responses.a_response import SocketResponse -from pyramid.data.health import HealthModules - -# from _socket import _RetAddress - - -class SocketServer: - def __init__(self, logger: Logger, health: HealthModules, host: str = "0.0.0.0") -> None: - self.__common = SocketCommon() - self.__host = host - self.__port = self.__common.port - self.__health = health - if logger: - self.__logger = logger - else: - logger = logging.getLogger("socket") - - def start_server(self): - # Set the host and port for the socket server - - # Create a socket object - server_socket = sock(socket.AF_INET, socket.SOCK_STREAM) - # Bind the socket to a specific address and port - server_socket.bind((self.__host, self.__port)) - # Listen for incoming connections (up to x connections in the queue) - server_socket.listen(10) - - self.__logger.info("Socket server open on %s:%d", self.__host, self.__port) - - client_socket: sock - client_address: Any - while True: - # Wait for a connection from a client - client_socket, client_address = server_socket.accept() - client_ip = client_address[0] - client_port = client_address[1] - # self.__logger.debug("[%s:%d] accepted", client_ip, client_port) - - try: - response_to_send = self.handle_client(client_socket, client_ip, client_port) - - if response_to_send: - # Convert the response data to JSON - response_json = self.__common.serialize( - response_to_send.to_json(self.__common.serialize) - ) - - # Send the JSON response back to the client - # self.__logger.debug("[%s:%d] <- %s", client_ip, client_port, response_json) - self.__common.send_chunk(client_socket, response_json) - except Exception as err: - self.__logger.warning("[%s:%d] %s", client_ip, client_port, err, exc_info=True) - finally: - client_socket.close() - - def handle_client(self, client_socket: sock, client_ip, client_port) -> SocketResponse | None: - # Receive data from the client - data = self.__common.receive_chunk(client_socket) - - if not data: - self.__logger.info("[%s:%d] -> ", client_ip, client_port) - return - - # self.__logger.debug("[%s:%d] -> %s", client_ip, client_port, data) - - def object_hook(json): - if isinstance(json, dict): - return AskRequest(**json) - return json - - json_data: AskRequest = self.__common.deserialize(data, object_hook=object_hook) - - response = SocketResponse() - - # Check the content of the JSON data - if not json_data.action: - response.create(ResponseCode.ERROR, "Missing action field in JSON data") - return response - - if json_data.action == "health": - data = self.__health - response.create(ResponseCode.OK, None, data) - return response - - # If the action is unknown, respond with an error message - response.create(ResponseCode.ERROR, "Unknown action") - self.__logger.info( - "[%s:%d] <- Unknown action '%s'", client_ip, client_port, json_data.action - ) - return response diff --git a/src/pyramid/data/functional/main.py b/src/pyramid/data/functional/main.py index d6af0e0..d5a48a4 100644 --- a/src/pyramid/data/functional/main.py +++ b/src/pyramid/data/functional/main.py @@ -35,10 +35,6 @@ def start(self): ServiceRegister.inject_services() ServiceRegister.start_services() - logger = ServiceRegister.get_service(ILoggerService) - - logger.debug(ServiceRegister.get_dependency_tree()) - MainQueue.init() TaskRegister.import_tasks() diff --git a/src/pyramid/data/health.py b/src/pyramid/data/health.py deleted file mode 100644 index 0990056..0000000 --- a/src/pyramid/data/health.py +++ /dev/null @@ -1,7 +0,0 @@ -class HealthModules: - def __init__(self, configuration: bool = False, discord: bool = False) -> None: - self.configuration = configuration - self.discord = discord - - def is_ok(self) -> bool: - return self.configuration and self.discord diff --git a/src/pyramid/data/ping.py b/src/pyramid/data/ping.py new file mode 100644 index 0000000..175a493 --- /dev/null +++ b/src/pyramid/data/ping.py @@ -0,0 +1,6 @@ +class PingSocket: + def __init__(self, ok: bool): + self.ok = ok + + def is_ok(self) -> bool: + return self.ok diff --git a/src/pyramid/services/configuration.py b/src/pyramid/services/configuration.py index 9435119..4a96f7f 100644 --- a/src/pyramid/services/configuration.py +++ b/src/pyramid/services/configuration.py @@ -29,7 +29,7 @@ def load(self, config_file: str = "config.yml", use_env_vars: bool = True) -> bo raw_values_env, result_values, "env vars", True, keys_length ) - # Load raw values from environment variables and config file + # Load raw values from config file try: raw_values_file = self._get_file_vars(config_file) result_values = self._transform_all(raw_values_file, keys_length) diff --git a/src/pyramid/services/logger.py b/src/pyramid/services/logger.py index adcded4..32f5eb0 100644 --- a/src/pyramid/services/logger.py +++ b/src/pyramid/services/logger.py @@ -45,6 +45,10 @@ def start(self): def critical(self, msg, *args, **kwargs): self.logger.critical(msg, *args, **kwargs) + sys.exit(1) + + def exception(self, msg, *args, **kwargs): + self.logger.exception(msg, *args, **kwargs) def error(self, msg, *args, **kwargs): self.logger.error(msg, *args, **kwargs) diff --git a/src/pyramid/services/socket_server.py b/src/pyramid/services/socket_server.py index 5adf270..a559760 100644 --- a/src/pyramid/services/socket_server.py +++ b/src/pyramid/services/socket_server.py @@ -1,27 +1,113 @@ +import asyncio +import socket +from socket import socket as sock +from typing import Any -from threading import Thread - -from pyramid.api.services import ILoggerService, ISocketServerService +from pyramid.api.services.logger import ILoggerService +from pyramid.api.services.socket_server import ISocketServerService from pyramid.api.services.tools.annotation import pyramid_service from pyramid.api.services.tools.injector import ServiceInjector -from pyramid.client.server import SocketServer -from pyramid.data.health import HealthModules - +from pyramid.client.common import ResponseCode, SocketCommon +from pyramid.client.requests.ask_request import AskRequest +from pyramid.client.responses.a_response import SocketResponse +from pyramid.data.ping import PingSocket @pyramid_service(interface=ISocketServerService) class SocketServerService(ISocketServerService, ServiceInjector): - def __init__(self): - pass + def __init__(self) -> None: + self.__common = SocketCommon() + self.__host = "0.0.0.0" + self.__port = self.__common.port + self.is_running = False + self.server_socket: sock | None = None + + def injectService(self, + logger_service: ILoggerService + ): + self.__logger = logger_service + + async def open(self): + self.server_socket = sock(socket.AF_INET, socket.SOCK_STREAM) + # self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.server_socket.bind((self.__host, self.__port)) + self.server_socket.listen(10) + + self.__logger.info("Socket server open on %s:%d", self.__host, self.__port) + + self.is_running = True + client_socket: sock | None = None + client_address: Any = None + client_ip: Any = None + client_port: Any = None + + while self.is_running: + try: + client_socket, client_address = self.server_socket.accept() + client_ip = client_address[0] + client_port = client_address[1] + response_to_send = await self.__handle_client(client_socket, client_ip, client_port) + if response_to_send: + # Convert the response data to JSON + response_json = SocketCommon.serialize( + response_to_send.to_json(SocketCommon.serialize) + ) + + # Send the JSON response back to the client + # self.__logger.debug("[%s:%d] <- %s", client_ip, client_port, response_json) + self.__common.send_chunk(client_socket, response_json) + except Exception as err: + if isinstance(err, OSError): + if err.errno == 9: + self.__logger.warning("Socket: [Errno 9] Bad file descriptor") + continue + if client_ip is not None and client_port is not None: + self.__logger.warning("[%s:%d] %s", client_ip, client_port, err, exc_info=True) + finally: + if client_socket is not None: + client_socket.close() + client_socket = None + client_address = None + client_ip = None + client_port = None + self.__logger.info("Socket server closed") + + def close(self): + self.is_running = False + if self.server_socket is None: + return + self.server_socket.shutdown(socket.SHUT_RDWR) + self.server_socket.close() + self.server_socket = None + self.__logger.info("Socket server stop") + + async def __handle_client(self, client_socket: sock, client_ip, client_port) -> SocketResponse | None: + data = self.__common.receive_chunk(client_socket) + + if not data: + self.__logger.info("[%s:%d] -> ", client_ip, client_port) + return + + def object_hook(json): + if isinstance(json, dict): + return AskRequest(**json) + return json + + json_data: AskRequest = SocketCommon.deserialize(data, object_hook=object_hook) + + response = SocketResponse() - def injectService(self, logger_service: ILoggerService): - self.logger_service = logger_service + if not json_data.action: + response.create(ResponseCode.ERROR, "Missing action field in JSON data") + return response - def start(self): - self._health = HealthModules() - self._health.configuration = True - self._health.discord = True + if json_data.action == "health": + data = PingSocket(True) + response.create(ResponseCode.OK, None, data) + return response - self.socket_server = SocketServer(self.logger_service.getChild("socket"), self._health) - thread = Thread(name="Socket", target=self.socket_server.start_server, daemon=True) - thread.start() + response.create(ResponseCode.ERROR, "Unknown action") + self.__logger.info( + "[%s:%d] <- Unknown action '%s'", client_ip, client_port, json_data.action + ) + return response diff --git a/src/pyramid/tasks/discord.py b/src/pyramid/tasks/discord.py index 6f22e66..15c4a3a 100644 --- a/src/pyramid/tasks/discord.py +++ b/src/pyramid/tasks/discord.py @@ -6,12 +6,7 @@ @pyramid_task(parameters=ParametersTask()) class DiscordTask(TaskInjector): - def __init__(self): - pass - - def injectService(self, - discord_service: IDiscordService - ): + def injectService(self, discord_service: IDiscordService): self.__discord_service = discord_service async def worker_asyc(self): diff --git a/src/pyramid/tasks/socket_server.py b/src/pyramid/tasks/socket_server.py new file mode 100644 index 0000000..e5ee8c0 --- /dev/null +++ b/src/pyramid/tasks/socket_server.py @@ -0,0 +1,17 @@ +from pyramid.api.services.socket_server import ISocketServerService +from pyramid.api.tasks.tools.annotation import pyramid_task +from pyramid.api.tasks.tools.injector import TaskInjector +from pyramid.api.tasks.tools.parameters import ParametersTask + + +@pyramid_task(parameters=ParametersTask()) +class SocketServerTask(TaskInjector): + + def injectService(self, socket_service: ISocketServerService): + self.__socket_server = socket_service + + async def worker_asyc(self): + await self.__socket_server.open() + + async def stop_asyc(self): + self.__socket_server.close() diff --git a/src/pyramid/tools/custom_queue.py b/src/pyramid/tools/custom_queue.py index 70e435e..8eaa543 100644 --- a/src/pyramid/tools/custom_queue.py +++ b/src/pyramid/tools/custom_queue.py @@ -101,7 +101,7 @@ def __init__(self, threads=1, name=None): def create_threads(self): for thread_id in range(1, self.__threads + 1): thread = Thread( - name="%s n°%d{thread_id}" % (self.__name, thread_id), + name="%s n°%d" % (self.__name, thread_id), target=self.__worker, args=(self.__queue, thread_id, self.__lock, self.__event), daemon=True, diff --git a/src/startup_cli.py b/src/startup_cli.py new file mode 100644 index 0000000..174ccc6 --- /dev/null +++ b/src/startup_cli.py @@ -0,0 +1,4 @@ +from pyramid.cli.startup import startup_cli + +if __name__ == "__main__": + startup_cli() diff --git a/src/startup_cli_dev.py b/src/startup_cli_dev.py new file mode 100644 index 0000000..c15c04d --- /dev/null +++ b/src/startup_cli_dev.py @@ -0,0 +1,10 @@ +import debugpy +from pyramid.cli.startup import startup_cli + +def startup_cli_dev(): + debugpy.listen(('0.0.0.0', 5679)) + debugpy.wait_for_client() + startup_cli() + +if __name__ == "__main__": + startup_cli_dev() diff --git a/src/dev.py b/src/startup_dev.py similarity index 100% rename from src/dev.py rename to src/startup_dev.py