diff --git a/.gitignore b/.gitignore index 6e9ba18522..a86f0abae0 100644 --- a/.gitignore +++ b/.gitignore @@ -56,3 +56,4 @@ daphne.sock.lock .pytest_cache coverage.xml setup_dev.yml +11env/ diff --git a/api/tacticalrmm/accounts/management/commands/reset_2fa.py b/api/tacticalrmm/accounts/management/commands/reset_2fa.py index dd7c6b6738..4a71fb3dcc 100644 --- a/api/tacticalrmm/accounts/management/commands/reset_2fa.py +++ b/api/tacticalrmm/accounts/management/commands/reset_2fa.py @@ -1,11 +1,10 @@ -import os import subprocess -from contextlib import suppress import pyotp from django.core.management.base import BaseCommand from accounts.models import User +from tacticalrmm.helpers import get_webdomain class Command(BaseCommand): @@ -22,26 +21,13 @@ def handle(self, *args, **kwargs): self.stdout.write(self.style.ERROR(f"User {username} doesn't exist")) return - domain = "Tactical RMM" - nginx = "/etc/nginx/sites-available/frontend.conf" - found = None - if os.path.exists(nginx): - with suppress(Exception): - with open(nginx, "r") as f: - for line in f: - if "server_name" in line: - found = line - break - - if found: - rep = found.replace("server_name", "").replace(";", "") - domain = "".join(rep.split()) - code = pyotp.random_base32() user.totp_key = code user.save(update_fields=["totp_key"]) - url = pyotp.totp.TOTP(code).provisioning_uri(username, issuer_name=domain) + url = pyotp.totp.TOTP(code).provisioning_uri( + username, issuer_name=get_webdomain() + ) subprocess.run(f'qr "{url}"', shell=True) self.stdout.write( self.style.WARNING("Scan the barcode above with your authenticator app") diff --git a/api/tacticalrmm/accounts/management/commands/reset_password.py b/api/tacticalrmm/accounts/management/commands/reset_password.py index 0c2a4b68e8..73ae3b34bc 100644 --- a/api/tacticalrmm/accounts/management/commands/reset_password.py +++ b/api/tacticalrmm/accounts/management/commands/reset_password.py @@ -1,3 +1,5 @@ +from getpass import getpass + from django.core.management.base import BaseCommand from accounts.models import User @@ -17,7 +19,13 @@ def handle(self, *args, **kwargs): self.stdout.write(self.style.ERROR(f"User {username} doesn't exist")) return - passwd = input("Enter new password: ") - user.set_password(passwd) + pass1, pass2 = "foo", "bar" + while pass1 != pass2: + pass1 = getpass() + pass2 = getpass(prompt="Confirm Password:") + if pass1 != pass2: + self.stdout.write(self.style.ERROR("Passwords don't match")) + + user.set_password(pass1) user.save() self.stdout.write(self.style.SUCCESS(f"Password for {username} was reset!")) diff --git a/api/tacticalrmm/accounts/permissions.py b/api/tacticalrmm/accounts/permissions.py index 9a0422a379..e9e809ecb3 100644 --- a/api/tacticalrmm/accounts/permissions.py +++ b/api/tacticalrmm/accounts/permissions.py @@ -7,24 +7,23 @@ class AccountsPerms(permissions.BasePermission): def has_permission(self, r, view) -> bool: if r.method == "GET": return _has_perm(r, "can_list_accounts") - else: - # allow users to reset their own password/2fa see issue #686 - base_path = "/accounts/users/" - paths = ["reset/", "reset_totp/"] + # allow users to reset their own password/2fa see issue #686 + base_path = "/accounts/users/" + paths = ("reset/", "reset_totp/") - if r.path in [base_path + i for i in paths]: - from accounts.models import User + if r.path in [base_path + i for i in paths]: + from accounts.models import User - try: - user = User.objects.get(pk=r.data["id"]) - except User.DoesNotExist: - pass - else: - if user == r.user: - return True + try: + user = User.objects.get(pk=r.data["id"]) + except User.DoesNotExist: + pass + else: + if user == r.user: + return True - return _has_perm(r, "can_manage_accounts") + return _has_perm(r, "can_manage_accounts") class RolesPerms(permissions.BasePermission): diff --git a/api/tacticalrmm/agents/management/commands/demo_cron.py b/api/tacticalrmm/agents/management/commands/demo_cron.py index 831ae83be0..d53870bf1b 100644 --- a/api/tacticalrmm/agents/management/commands/demo_cron.py +++ b/api/tacticalrmm/agents/management/commands/demo_cron.py @@ -5,7 +5,7 @@ from django.utils import timezone as djangotime from agents.models import Agent -from core.tasks import cache_db_fields_task, handle_resolved_stuff +from core.tasks import cache_db_fields_task class Command(BaseCommand): @@ -30,4 +30,3 @@ def handle(self, *args, **kwargs): agent.save(update_fields=["last_seen"]) cache_db_fields_task() - handle_resolved_stuff() diff --git a/api/tacticalrmm/agents/models.py b/api/tacticalrmm/agents/models.py index c983c18d4e..c42e252267 100644 --- a/api/tacticalrmm/agents/models.py +++ b/api/tacticalrmm/agents/models.py @@ -40,7 +40,7 @@ PAAction, PAStatus, ) -from tacticalrmm.helpers import get_nats_ports +from tacticalrmm.helpers import setup_nats_options from tacticalrmm.models import PermissionQuerySet if TYPE_CHECKING: @@ -799,18 +799,10 @@ def _do_nats_debug(self, agent: "Agent", message: str) -> None: async def nats_cmd( self, data: Dict[Any, Any], timeout: int = 30, wait: bool = True ) -> Any: - nats_std_port, _ = get_nats_ports() - options = { - "servers": f"tls://{settings.ALLOWED_HOSTS[0]}:{nats_std_port}", - "user": "tacticalrmm", - "name": "trmm-django", - "password": settings.SECRET_KEY, - "connect_timeout": 3, - "max_reconnect_attempts": 2, - } + opts = setup_nats_options() try: - nc = await nats.connect(**options) + nc = await nats.connect(**opts) except: return "natsdown" diff --git a/api/tacticalrmm/agents/tasks.py b/api/tacticalrmm/agents/tasks.py index 388deb4145..6ee0bc23e8 100644 --- a/api/tacticalrmm/agents/tasks.py +++ b/api/tacticalrmm/agents/tasks.py @@ -1,5 +1,4 @@ import datetime as dt -import random from time import sleep from typing import TYPE_CHECKING, Optional @@ -13,10 +12,13 @@ from tacticalrmm.celery import app from tacticalrmm.constants import ( AGENT_DEFER, + AGENT_OUTAGES_LOCK, AGENT_STATUS_OVERDUE, CheckStatus, DebugLogType, ) +from tacticalrmm.helpers import rand_range +from tacticalrmm.utils import redis_lock if TYPE_CHECKING: from django.db.models.query import QuerySet @@ -46,7 +48,7 @@ def agent_outage_email_task(pk: int, alert_interval: Optional[float] = None) -> return "alert not found" if not alert.email_sent: - sleep(random.randint(1, 5)) + sleep(rand_range(100, 1500)) alert.agent.send_outage_email() alert.email_sent = djangotime.now() alert.save(update_fields=["email_sent"]) @@ -55,7 +57,7 @@ def agent_outage_email_task(pk: int, alert_interval: Optional[float] = None) -> # send an email only if the last email sent is older than alert interval delta = djangotime.now() - dt.timedelta(days=alert_interval) if alert.email_sent < delta: - sleep(random.randint(1, 5)) + sleep(rand_range(100, 1500)) alert.agent.send_outage_email() alert.email_sent = djangotime.now() alert.save(update_fields=["email_sent"]) @@ -67,7 +69,7 @@ def agent_outage_email_task(pk: int, alert_interval: Optional[float] = None) -> def agent_recovery_email_task(pk: int) -> str: from alerts.models import Alert - sleep(random.randint(1, 5)) + sleep(rand_range(100, 1500)) try: alert = Alert.objects.get(pk=pk) @@ -91,7 +93,7 @@ def agent_outage_sms_task(pk: int, alert_interval: Optional[float] = None) -> st return "alert not found" if not alert.sms_sent: - sleep(random.randint(1, 3)) + sleep(rand_range(100, 1500)) alert.agent.send_outage_sms() alert.sms_sent = djangotime.now() alert.save(update_fields=["sms_sent"]) @@ -100,7 +102,7 @@ def agent_outage_sms_task(pk: int, alert_interval: Optional[float] = None) -> st # send an sms only if the last sms sent is older than alert interval delta = djangotime.now() - dt.timedelta(days=alert_interval) if alert.sms_sent < delta: - sleep(random.randint(1, 3)) + sleep(rand_range(100, 1500)) alert.agent.send_outage_sms() alert.sms_sent = djangotime.now() alert.save(update_fields=["sms_sent"]) @@ -112,7 +114,7 @@ def agent_outage_sms_task(pk: int, alert_interval: Optional[float] = None) -> st def agent_recovery_sms_task(pk: int) -> str: from alerts.models import Alert - sleep(random.randint(1, 3)) + sleep(rand_range(100, 1500)) try: alert = Alert.objects.get(pk=pk) except Alert.DoesNotExist: @@ -125,24 +127,20 @@ def agent_recovery_sms_task(pk: int) -> str: return "ok" -@app.task -def agent_outages_task() -> None: - from alerts.models import Alert +@app.task(bind=True) +def agent_outages_task(self) -> str: + with redis_lock(AGENT_OUTAGES_LOCK, self.app.oid) as acquired: + if not acquired: + return f"{self.app.oid} still running" - agents = Agent.objects.only( - "pk", - "agent_id", - "last_seen", - "offline_time", - "overdue_time", - "overdue_email_alert", - "overdue_text_alert", - "overdue_dashboard_alert", - ) + from alerts.models import Alert + from core.tasks import _get_agent_qs - for agent in agents: - if agent.status == AGENT_STATUS_OVERDUE: - Alert.handle_alert_failure(agent) + for agent in _get_agent_qs(): + if agent.status == AGENT_STATUS_OVERDUE: + Alert.handle_alert_failure(agent) + + return "completed" @app.task diff --git a/api/tacticalrmm/agents/urls.py b/api/tacticalrmm/agents/urls.py index 73ffd2ba3d..7e77f25978 100644 --- a/api/tacticalrmm/agents/urls.py +++ b/api/tacticalrmm/agents/urls.py @@ -42,4 +42,5 @@ path("update/", views.update_agents), path("installer/", views.install_agent), path("bulkrecovery/", views.bulk_agent_recovery), + path("scripthistory/", views.ScriptRunHistory.as_view()), ] diff --git a/api/tacticalrmm/agents/views.py b/api/tacticalrmm/agents/views.py index 5443326713..27086ea597 100644 --- a/api/tacticalrmm/agents/views.py +++ b/api/tacticalrmm/agents/views.py @@ -6,11 +6,19 @@ from io import StringIO from pathlib import Path +from core.utils import ( + get_core_settings, + get_mesh_ws_url, + remove_mesh_agent, + token_is_valid, +) from django.conf import settings from django.db.models import Exists, OuterRef, Prefetch, Q from django.http import HttpResponse from django.shortcuts import get_object_or_404 from django.utils import timezone as djangotime +from django.utils.dateparse import parse_datetime +from logs.models import AuditLog, DebugLog, PendingAction from meshctrl.utils import get_login_token from packaging import version as pyver from rest_framework import serializers @@ -19,14 +27,6 @@ from rest_framework.permissions import IsAuthenticated from rest_framework.response import Response from rest_framework.views import APIView - -from core.utils import ( - get_core_settings, - get_mesh_ws_url, - remove_mesh_agent, - token_is_valid, -) -from logs.models import AuditLog, DebugLog, PendingAction from scripts.models import Script from scripts.tasks import handle_bulk_command_task, handle_bulk_script_task from tacticalrmm.constants import ( @@ -530,11 +530,10 @@ def patch(self, request, agent_id): @api_view(["POST"]) @permission_classes([IsAuthenticated, InstallAgentPerms]) def install_agent(request): - from knox.models import AuthToken - from accounts.models import User from agents.utils import get_agent_url from core.utils import token_is_valid + from knox.models import AuthToken client_id = request.data["client"] site_id = request.data["site"] @@ -1028,3 +1027,99 @@ def get(self, request, agent_id=None): history = AgentHistory.objects.filter_by_role(request.user) # type: ignore ctx = {"default_tz": get_default_timezone()} return Response(AgentHistorySerializer(history, many=True, context=ctx).data) + + +class ScriptRunHistory(APIView): + permission_classes = [IsAuthenticated, AgentHistoryPerms] + + class OutputSerializer(serializers.ModelSerializer): + script_name = serializers.ReadOnlyField(source="script.name") + agent_id = serializers.ReadOnlyField(source="agent.agent_id") + + class Meta: + model = AgentHistory + fields = ( + "id", + "time", + "username", + "script", + "script_results", + "agent", + "script_name", + "agent_id", + ) + read_only_fields = fields + + def get(self, request): + + date_range_filter = Q() + script_name_filter = Q() + + start = request.query_params.get("start", None) + end = request.query_params.get("end", None) + limit = request.query_params.get("limit", None) + script_name = request.query_params.get("scriptname", None) + if start and end: + start_dt = parse_datetime(start) + end_dt = parse_datetime(end) + djangotime.timedelta(days=1) + date_range_filter = Q(time__range=[start_dt, end_dt]) + + if script_name: + script_name_filter = Q(script__name=script_name) + + AGENT_R_DEFER = ( + "agent__wmi_detail", + "agent__services", + "agent__created_by", + "agent__created_time", + "agent__modified_by", + "agent__modified_time", + "agent__disks", + "agent__operating_system", + "agent__mesh_node_id", + "agent__description", + "agent__patches_last_installed", + "agent__time_zone", + "agent__alert_template_id", + "agent__policy_id", + "agent__site_id", + "agent__version", + "agent__plat", + "agent__goarch", + "agent__hostname", + "agent__last_seen", + "agent__public_ip", + "agent__total_ram", + "agent__boot_time", + "agent__logged_in_username", + "agent__last_logged_in_user", + "agent__monitoring_type", + "agent__overdue_email_alert", + "agent__overdue_text_alert", + "agent__overdue_dashboard_alert", + "agent__offline_time", + "agent__overdue_time", + "agent__check_interval", + "agent__needs_reboot", + "agent__choco_installed", + "agent__maintenance_mode", + "agent__block_policy_inheritance", + ) + hists = ( + AgentHistory.objects.filter(type=AgentHistoryType.SCRIPT_RUN) + .select_related("agent") + .select_related("script") + .defer(*AGENT_R_DEFER) + .filter(date_range_filter) + .filter(script_name_filter) + .order_by("-time") + ) + if limit: + try: + lim = int(limit) + except KeyError: + return notify_error("Invalid limit") + hists = hists[:lim] + + ret = self.OutputSerializer(hists, many=True).data + return Response(ret) diff --git a/api/tacticalrmm/alerts/models.py b/api/tacticalrmm/alerts/models.py index 8077bda5df..070a6780d0 100644 --- a/api/tacticalrmm/alerts/models.py +++ b/api/tacticalrmm/alerts/models.py @@ -172,12 +172,12 @@ def create_or_return_check_alert( alert_type=AlertType.CHECK, severity=check.alert_severity if check.check_type - not in [ + not in { CheckType.MEMORY, CheckType.CPU_LOAD, CheckType.DISK_SPACE, CheckType.SCRIPT, - ] + } else alert_severity, message=f"{agent.hostname} has a {check.check_type} check: {check.readable_desc} that failed.", hidden=True, @@ -328,12 +328,12 @@ def handle_alert_failure( alert_severity = ( instance.assigned_check.alert_severity if instance.assigned_check.check_type - not in [ + not in { CheckType.MEMORY, CheckType.CPU_LOAD, CheckType.DISK_SPACE, CheckType.SCRIPT, - ] + } else instance.alert_severity ) agent = instance.agent @@ -342,23 +342,20 @@ def handle_alert_failure( if alert_template: dashboard_severities = ( alert_template.check_dashboard_alert_severity - if alert_template.check_dashboard_alert_severity - else [ + or [ AlertSeverity.ERROR, AlertSeverity.WARNING, AlertSeverity.INFO, ] ) - email_severities = ( - alert_template.check_email_alert_severity - if alert_template.check_email_alert_severity - else [AlertSeverity.ERROR, AlertSeverity.WARNING] - ) - text_severities = ( - alert_template.check_text_alert_severity - if alert_template.check_text_alert_severity - else [AlertSeverity.ERROR, AlertSeverity.WARNING] - ) + email_severities = alert_template.check_email_alert_severity or [ + AlertSeverity.ERROR, + AlertSeverity.WARNING, + ] + text_severities = alert_template.check_text_alert_severity or [ + AlertSeverity.ERROR, + AlertSeverity.WARNING, + ] always_dashboard = alert_template.check_always_alert always_email = alert_template.check_always_email always_text = alert_template.check_always_text @@ -381,21 +378,18 @@ def handle_alert_failure( # set alert_template settings if alert_template: - dashboard_severities = ( - alert_template.task_dashboard_alert_severity - if alert_template.task_dashboard_alert_severity - else [AlertSeverity.ERROR, AlertSeverity.WARNING] - ) - email_severities = ( - alert_template.task_email_alert_severity - if alert_template.task_email_alert_severity - else [AlertSeverity.ERROR, AlertSeverity.WARNING] - ) - text_severities = ( - alert_template.task_text_alert_severity - if alert_template.task_text_alert_severity - else [AlertSeverity.ERROR, AlertSeverity.WARNING] - ) + dashboard_severities = alert_template.task_dashboard_alert_severity or [ + AlertSeverity.ERROR, + AlertSeverity.WARNING, + ] + email_severities = alert_template.task_email_alert_severity or [ + AlertSeverity.ERROR, + AlertSeverity.WARNING, + ] + text_severities = alert_template.task_text_alert_severity or [ + AlertSeverity.ERROR, + AlertSeverity.WARNING, + ] always_dashboard = alert_template.task_always_alert always_email = alert_template.task_always_email always_text = alert_template.task_always_text diff --git a/api/tacticalrmm/alerts/tests.py b/api/tacticalrmm/alerts/tests.py index ed75443ea0..09e4fc59f0 100644 --- a/api/tacticalrmm/alerts/tests.py +++ b/api/tacticalrmm/alerts/tests.py @@ -8,7 +8,7 @@ from alerts.tasks import cache_agents_alert_template from autotasks.models import TaskResult -from core.tasks import cache_db_fields_task, handle_resolved_stuff +from core.tasks import cache_db_fields_task, resolve_alerts_task from core.utils import get_core_settings from tacticalrmm.constants import AgentMonType, AlertSeverity, AlertType, CheckStatus from tacticalrmm.test import TacticalTestCase @@ -686,7 +686,7 @@ def test_handle_agent_alerts( agent_template_email.save() cache_db_fields_task() - handle_resolved_stuff() + resolve_alerts_task() recovery_sms.assert_called_with( pk=Alert.objects.get(agent=agent_template_text).pk @@ -1372,8 +1372,8 @@ def test_alert_actions( self, recovery_sms, recovery_email, outage_email, outage_sms, nats_cmd ): - from agents.tasks import agent_outages_task from agents.models import AgentHistory + from agents.tasks import agent_outages_task # Setup cmd mock success = { @@ -1449,7 +1449,7 @@ def test_alert_actions( agent.save() cache_db_fields_task() - handle_resolved_stuff() + resolve_alerts_task() # this is what data should be data = { @@ -1633,7 +1633,7 @@ def test_edit_delete_get_alert_permissions(self, delete): unauthorized_task_url, ] - for method in ["get", "put", "delete"]: + for method in ("get", "put", "delete"): # test superuser access for url in authorized_urls: diff --git a/api/tacticalrmm/apiv3/views.py b/api/tacticalrmm/apiv3/views.py index c370ac1f3e..1ca47714dc 100644 --- a/api/tacticalrmm/apiv3/views.py +++ b/api/tacticalrmm/apiv3/views.py @@ -255,9 +255,7 @@ def get(self, request, agentid): check.check_result.last_run < djangotime.now() - djangotime.timedelta( - seconds=check.run_interval - if check.run_interval - else agent.check_interval + seconds=check.run_interval or agent.check_interval ) ) ] diff --git a/api/tacticalrmm/automation/models.py b/api/tacticalrmm/automation/models.py index 27ce6d4343..68c67f248e 100644 --- a/api/tacticalrmm/automation/models.py +++ b/api/tacticalrmm/automation/models.py @@ -225,7 +225,7 @@ def get_policy_tasks(agent: "Agent") -> "List[AutomatedTask]": processed_policies = [] - for _, policy in policies.items(): + for policy in policies.values(): if policy and policy.active and policy.pk not in processed_policies: processed_policies.append(policy.pk) for task in policy.autotasks.all(): @@ -249,7 +249,7 @@ def get_policy_checks(agent: "Agent") -> "List[Check]": processed_policies = [] - for _, policy in policies.items(): + for policy in policies.values(): if policy and policy.active and policy.pk not in processed_policies: processed_policies.append(policy.pk) if policy.enforced: diff --git a/api/tacticalrmm/autotasks/models.py b/api/tacticalrmm/autotasks/models.py index 7105af7b9f..cd74cffffd 100644 --- a/api/tacticalrmm/autotasks/models.py +++ b/api/tacticalrmm/autotasks/models.py @@ -71,7 +71,7 @@ class AutomatedTask(BaseAuditModel): on_delete=models.SET_NULL, ) - # format -> [{"type": "script", "script": 1, "name": "Script Name", "timeout": 90, "script_args": []}, {"type": "cmd", "command": "whoami", "timeout": 90}] + # format -> [{"type": "script", "script": 1, "name": "Script Name", "timeout": 90, "script_args": [], "env_vars": []}, {"type": "cmd", "command": "whoami", "timeout": 90}] actions = JSONField(default=list) assigned_check = models.ForeignKey( "checks.Check", @@ -250,9 +250,7 @@ def generate_nats_task_payload( "trigger": self.task_type if self.task_type != TaskType.CHECK_FAILURE else TaskType.MANUAL, - "multiple_instances": self.task_instance_policy - if self.task_instance_policy - else 0, + "multiple_instances": self.task_instance_policy or 0, "delete_expired_task_after": self.remove_if_not_scheduled if self.expire_date else False, diff --git a/api/tacticalrmm/autotasks/serializers.py b/api/tacticalrmm/autotasks/serializers.py index 2c9e8dbed1..047e0c4ce0 100644 --- a/api/tacticalrmm/autotasks/serializers.py +++ b/api/tacticalrmm/autotasks/serializers.py @@ -1,3 +1,6 @@ +from datetime import datetime + +from django.utils import timezone as djangotime from rest_framework import serializers from scripts.models import Script @@ -97,6 +100,13 @@ def validate(self, data): del data["assigned_check"] return data + if ( + "expire_date" in data + and isinstance(data["expire_date"], datetime) + and djangotime.now() > data["expire_date"] + ): + raise serializers.ValidationError("Expires date/time is in the past") + # run_time_date required if ( data["task_type"] diff --git a/api/tacticalrmm/autotasks/tasks.py b/api/tacticalrmm/autotasks/tasks.py index a60b8e83db..6406589092 100644 --- a/api/tacticalrmm/autotasks/tasks.py +++ b/api/tacticalrmm/autotasks/tasks.py @@ -1,18 +1,25 @@ import asyncio import datetime as dt -import random +from collections import namedtuple from contextlib import suppress from time import sleep -from typing import Optional, Union +from typing import TYPE_CHECKING, Optional, Union +import msgpack +import nats from django.utils import timezone as djangotime +from nats.errors import TimeoutError from agents.models import Agent from alerts.models import Alert from autotasks.models import AutomatedTask, TaskResult -from logs.models import DebugLog from tacticalrmm.celery import app -from tacticalrmm.constants import DebugLogType +from tacticalrmm.constants import AGENT_STATUS_ONLINE, ORPHANED_WIN_TASK_LOCK +from tacticalrmm.helpers import rand_range, setup_nats_options +from tacticalrmm.utils import redis_lock + +if TYPE_CHECKING: + from nats.aio.client import Client as NATSClient @app.task @@ -79,56 +86,74 @@ def run_win_task(pk: int, agent_id: Optional[str] = None) -> str: return "ok" -@app.task -def remove_orphaned_win_tasks() -> None: - from agents.models import Agent - - for agent in Agent.online_agents(): - r = asyncio.run(agent.nats_cmd({"func": "listschedtasks"}, timeout=10)) - - if not isinstance(r, list): # empty list - DebugLog.error( - agent=agent, - log_type=DebugLogType.AGENT_ISSUES, - message=f"Unable to pull list of scheduled tasks on {agent.hostname}: {r}", - ) - continue - - agent_task_names = [ - task.win_task_name for task in agent.get_tasks_with_policies() - ] - - exclude_tasks = ( - "TacticalRMM_fixmesh", - "TacticalRMM_SchedReboot", - "TacticalRMM_sync", - "TacticalRMM_agentupdate", - ) +@app.task(bind=True) +def remove_orphaned_win_tasks(self) -> str: + with redis_lock(ORPHANED_WIN_TASK_LOCK, self.app.oid) as acquired: + if not acquired: + return f"{self.app.oid} still running" + + from core.tasks import _get_agent_qs + + AgentTup = namedtuple("AgentTup", ["agent_id", "task_names"]) + items: "list[AgentTup]" = [] + exclude_tasks = ("TacticalRMM_SchedReboot",) + + for agent in _get_agent_qs(): + if agent.status == AGENT_STATUS_ONLINE: + + names = [task.win_task_name for task in agent.get_tasks_with_policies()] + items.append(AgentTup._make([agent.agent_id, names])) + + async def _handle_task(nc: "NATSClient", sub, data, names) -> str: + try: + msg = await nc.request( + subject=sub, payload=msgpack.dumps(data), timeout=5 + ) + except TimeoutError: + return "timeout" + + try: + r = msgpack.loads(msg.data) + except Exception as e: + return str(e) + + if not isinstance(r, list): + return "notlist" + + for name in r: + if name.startswith(exclude_tasks): + # skip system tasks or any pending reboots + continue + + if name.startswith("TacticalRMM_") and name not in names: + nats_data = { + "func": "delschedtask", + "schedtaskpayload": {"name": name}, + } + print(f"Deleting orphaned task: {name} on agent {sub}") + await nc.publish(subject=sub, payload=msgpack.dumps(nats_data)) + + return "ok" + + async def _run() -> None: + opts = setup_nats_options() + try: + nc = await nats.connect(**opts) + except Exception as e: + return str(e) + + payload = {"func": "listschedtasks"} + tasks = [ + _handle_task( + nc=nc, sub=item.agent_id, data=payload, names=item.task_names + ) + for item in items + ] + await asyncio.gather(*tasks) + await nc.close() - for task in r: - if task.startswith(exclude_tasks): - # skip system tasks or any pending reboots - continue - - if task.startswith("TacticalRMM_") and task not in agent_task_names: - # delete task since it doesn't exist in UI - nats_data = { - "func": "delschedtask", - "schedtaskpayload": {"name": task}, - } - ret = asyncio.run(agent.nats_cmd(nats_data, timeout=10)) - if ret != "ok": - DebugLog.error( - agent=agent, - log_type=DebugLogType.AGENT_ISSUES, - message=f"Unable to clean up orphaned task {task} on {agent.hostname}: {ret}", - ) - else: - DebugLog.info( - agent=agent, - log_type=DebugLogType.AGENT_ISSUES, - message=f"Removed orphaned task {task} from {agent.hostname}", - ) + asyncio.run(_run()) + return "completed" @app.task @@ -144,7 +169,7 @@ def handle_task_email_alert(pk: int, alert_interval: Union[float, None] = None) task_result = TaskResult.objects.get( task=alert.assigned_task, agent=alert.agent ) - sleep(random.randint(1, 5)) + sleep(rand_range(100, 1500)) task_result.send_email() alert.email_sent = djangotime.now() alert.save(update_fields=["email_sent"]) @@ -156,7 +181,7 @@ def handle_task_email_alert(pk: int, alert_interval: Union[float, None] = None) task_result = TaskResult.objects.get( task=alert.assigned_task, agent=alert.agent ) - sleep(random.randint(1, 5)) + sleep(rand_range(100, 1500)) task_result.send_email() alert.email_sent = djangotime.now() alert.save(update_fields=["email_sent"]) @@ -177,7 +202,7 @@ def handle_task_sms_alert(pk: int, alert_interval: Union[float, None] = None) -> task_result = TaskResult.objects.get( task=alert.assigned_task, agent=alert.agent ) - sleep(random.randint(1, 3)) + sleep(rand_range(100, 1500)) task_result.send_sms() alert.sms_sent = djangotime.now() alert.save(update_fields=["sms_sent"]) @@ -189,7 +214,7 @@ def handle_task_sms_alert(pk: int, alert_interval: Union[float, None] = None) -> task_result = TaskResult.objects.get( task=alert.assigned_task, agent=alert.agent ) - sleep(random.randint(1, 3)) + sleep(rand_range(100, 1500)) task_result.send_sms() alert.sms_sent = djangotime.now() alert.save(update_fields=["sms_sent"]) @@ -210,7 +235,7 @@ def handle_resolved_task_sms_alert(pk: int) -> str: task_result = TaskResult.objects.get( task=alert.assigned_task, agent=alert.agent ) - sleep(random.randint(1, 3)) + sleep(rand_range(100, 1500)) task_result.send_resolved_sms() alert.resolved_sms_sent = djangotime.now() alert.save(update_fields=["resolved_sms_sent"]) @@ -231,7 +256,7 @@ def handle_resolved_task_email_alert(pk: int) -> str: task_result = TaskResult.objects.get( task=alert.assigned_task, agent=alert.agent ) - sleep(random.randint(1, 5)) + sleep(rand_range(100, 1500)) task_result.send_resolved_email() alert.resolved_email_sent = djangotime.now() alert.save(update_fields=["resolved_email_sent"]) diff --git a/api/tacticalrmm/autotasks/tests.py b/api/tacticalrmm/autotasks/tests.py index f88bc1a85c..847543bf6d 100644 --- a/api/tacticalrmm/autotasks/tests.py +++ b/api/tacticalrmm/autotasks/tests.py @@ -1,4 +1,4 @@ -from unittest.mock import call, patch +from unittest.mock import patch from django.utils import timezone as djangotime from model_bakery import baker @@ -8,7 +8,7 @@ from .models import AutomatedTask, TaskResult, TaskSyncStatus from .serializers import TaskSerializer -from .tasks import create_win_task_schedule, remove_orphaned_win_tasks, run_win_task +from .tasks import create_win_task_schedule, run_win_task base_url = "/tasks" @@ -137,7 +137,7 @@ def test_add_autotask(self, create_win_task_schedule): "weekly_interval": 2, "run_time_bit_weekdays": 26, "run_time_date": djangotime.now(), - "expire_date": djangotime.now(), + "expire_date": djangotime.now() + djangotime.timedelta(weeks=5), "repetition_interval": "30S", "repetition_duration": "1H", "random_task_delay": "5M", @@ -160,7 +160,7 @@ def test_add_autotask(self, create_win_task_schedule): "monthly_months_of_year": 56, "monthly_days_of_month": 350, "run_time_date": djangotime.now(), - "expire_date": djangotime.now(), + "expire_date": djangotime.now() + djangotime.timedelta(weeks=5), "repetition_interval": "30S", "repetition_duration": "1H", "random_task_delay": "5M", @@ -183,7 +183,7 @@ def test_add_autotask(self, create_win_task_schedule): "monthly_weeks_of_month": 4, "run_time_bit_weekdays": 15, "run_time_date": djangotime.now(), - "expire_date": djangotime.now(), + "expire_date": djangotime.now() + djangotime.timedelta(weeks=5), "repetition_interval": "30S", "repetition_duration": "1H", "random_task_delay": "5M", @@ -206,7 +206,7 @@ def test_add_autotask(self, create_win_task_schedule): "monthly_weeks_of_month": 4, "run_time_bit_weekdays": 15, "run_time_date": djangotime.now(), - "expire_date": djangotime.now(), + "expire_date": djangotime.now() + djangotime.timedelta(weeks=5), "repetition_interval": "30S", "repetition_duration": "1H", "random_task_delay": "5M", @@ -296,7 +296,7 @@ def test_update_autotask(self): "monthly_weeks_of_month": 4, "run_time_bit_weekdays": 15, "run_time_date": djangotime.now(), - "expire_date": djangotime.now(), + "expire_date": djangotime.now() + djangotime.timedelta(weeks=5), "repetition_interval": "30S", "repetition_duration": "1H", "random_task_delay": "5M", @@ -316,7 +316,7 @@ def test_update_autotask(self): "monthly_weeks_of_month": 4, "run_time_bit_weekdays": 15, "run_time_date": djangotime.now(), - "expire_date": djangotime.now(), + "expire_date": djangotime.now() + djangotime.timedelta(weeks=5), "repetition_interval": "30S", "repetition_duration": "1H", "random_task_delay": "5M", @@ -383,60 +383,6 @@ def setUp(self): self.authenticate() self.setup_coresettings() - @patch("agents.models.Agent.nats_cmd") - def test_remove_orphaned_win_task(self, nats_cmd): - agent = baker.make_recipe("agents.online_agent") - baker.make_recipe("agents.offline_agent") - task1 = AutomatedTask.objects.create( - agent=agent, - name="test task 1", - ) - - # test removing an orphaned task - win_tasks = [ - "Adobe Acrobat Update Task", - "AdobeGCInvoker-1.0", - "GoogleUpdateTaskMachineCore", - "GoogleUpdateTaskMachineUA", - "OneDrive Standalone Update Task-S-1-5-21-717461175-241712648-1206041384-1001", - task1.win_task_name, - "TacticalRMM_fixmesh", - "TacticalRMM_SchedReboot_jk324kajd", - "TacticalRMM_iggrLcOaldIZnUzLuJWPLNwikiOoJJHHznb", # orphaned task - ] - - calls = [ - call({"func": "listschedtasks"}, timeout=10), - call( - { - "func": "delschedtask", - "schedtaskpayload": { - "name": "TacticalRMM_iggrLcOaldIZnUzLuJWPLNwikiOoJJHHznb" - }, - }, - timeout=10, - ), - ] - - nats_cmd.side_effect = [win_tasks, "ok"] - remove_orphaned_win_tasks() - self.assertEqual(nats_cmd.call_count, 2) - nats_cmd.assert_has_calls(calls) - - # test nats delete task fail - nats_cmd.reset_mock() - nats_cmd.side_effect = [win_tasks, "error deleting task"] - remove_orphaned_win_tasks() - nats_cmd.assert_has_calls(calls) - self.assertEqual(nats_cmd.call_count, 2) - - # no orphaned tasks - nats_cmd.reset_mock() - win_tasks.remove("TacticalRMM_iggrLcOaldIZnUzLuJWPLNwikiOoJJHHznb") - nats_cmd.side_effect = [win_tasks, "ok"] - remove_orphaned_win_tasks() - self.assertEqual(nats_cmd.call_count, 1) - @patch("agents.models.Agent.nats_cmd") def test_run_win_task(self, nats_cmd): self.agent = baker.make_recipe("agents.agent") @@ -868,7 +814,7 @@ def test_add_task_permissions(self): url = f"{base_url}/" - for data in [policy_data, agent_data]: + for data in (policy_data, agent_data): # test superuser access self.check_authorized_superuser("post", url, data) @@ -904,7 +850,7 @@ def test_task_get_edit_delete_permissions(self, delete_task): ) policy_task = baker.make("autotasks.AutomatedTask", policy=policy) - for method in ["get", "put", "delete"]: + for method in ("get", "put", "delete"): url = f"{base_url}/{task.id}/" unauthorized_url = f"{base_url}/{unauthorized_task.id}/" diff --git a/api/tacticalrmm/checks/tasks.py b/api/tacticalrmm/checks/tasks.py index 97f6642074..ae87947e28 100644 --- a/api/tacticalrmm/checks/tasks.py +++ b/api/tacticalrmm/checks/tasks.py @@ -1,5 +1,4 @@ import datetime as dt -import random from time import sleep from typing import Optional @@ -8,6 +7,7 @@ from alerts.models import Alert from checks.models import CheckResult from tacticalrmm.celery import app +from tacticalrmm.helpers import rand_range @app.task @@ -24,7 +24,7 @@ def handle_check_email_alert_task( check_result = CheckResult.objects.get( assigned_check=alert.assigned_check, agent=alert.agent ) - sleep(random.randint(1, 5)) + sleep(rand_range(100, 1500)) check_result.send_email() alert.email_sent = djangotime.now() alert.save(update_fields=["email_sent"]) @@ -36,7 +36,7 @@ def handle_check_email_alert_task( check_result = CheckResult.objects.get( assigned_check=alert.assigned_check, agent=alert.agent ) - sleep(random.randint(1, 5)) + sleep(rand_range(100, 1500)) check_result.send_email() alert.email_sent = djangotime.now() alert.save(update_fields=["email_sent"]) @@ -57,7 +57,7 @@ def handle_check_sms_alert_task(pk: int, alert_interval: Optional[float] = None) check_result = CheckResult.objects.get( assigned_check=alert.assigned_check, agent=alert.agent ) - sleep(random.randint(1, 3)) + sleep(rand_range(100, 1500)) check_result.send_sms() alert.sms_sent = djangotime.now() alert.save(update_fields=["sms_sent"]) @@ -69,7 +69,7 @@ def handle_check_sms_alert_task(pk: int, alert_interval: Optional[float] = None) check_result = CheckResult.objects.get( assigned_check=alert.assigned_check, agent=alert.agent ) - sleep(random.randint(1, 3)) + sleep(rand_range(100, 1500)) check_result.send_sms() alert.sms_sent = djangotime.now() alert.save(update_fields=["sms_sent"]) @@ -90,7 +90,7 @@ def handle_resolved_check_sms_alert_task(pk: int) -> str: check_result = CheckResult.objects.get( assigned_check=alert.assigned_check, agent=alert.agent ) - sleep(random.randint(1, 3)) + sleep(rand_range(100, 1500)) check_result.send_resolved_sms() alert.resolved_sms_sent = djangotime.now() alert.save(update_fields=["resolved_sms_sent"]) @@ -111,7 +111,7 @@ def handle_resolved_check_email_alert_task(pk: int) -> str: check_result = CheckResult.objects.get( assigned_check=alert.assigned_check, agent=alert.agent ) - sleep(random.randint(1, 5)) + sleep(rand_range(100, 1500)) check_result.send_resolved_email() alert.resolved_email_sent = djangotime.now() alert.save(update_fields=["resolved_email_sent"]) diff --git a/api/tacticalrmm/checks/tests.py b/api/tacticalrmm/checks/tests.py index 613a4dc995..0ef581769f 100644 --- a/api/tacticalrmm/checks/tests.py +++ b/api/tacticalrmm/checks/tests.py @@ -101,7 +101,7 @@ def test_add_disk_check(self): "fails_b4_alert": 3, } - for payload in [agent_payload, policy_payload]: + for payload in (agent_payload, policy_payload): # add valid check resp = self.client.post(url, payload, format="json") @@ -148,7 +148,7 @@ def test_add_cpuload_check(self): "fails_b4_alert": 9, } - for payload in [agent_payload, policy_payload]: + for payload in (agent_payload, policy_payload): # add cpu check resp = self.client.post(url, payload, format="json") @@ -195,7 +195,7 @@ def test_add_memory_check(self): "fails_b4_alert": 1, } - for payload in [agent_payload, policy_payload]: + for payload in (agent_payload, policy_payload): # add memory check resp = self.client.post(url, payload, format="json") @@ -972,7 +972,7 @@ def test_add_check_permissions(self): url = f"{base_url}/" - for data in [policy_data, agent_data]: + for data in (policy_data, agent_data): # test superuser access self.check_authorized_superuser("post", url, data) @@ -1006,7 +1006,7 @@ def test_check_get_edit_delete_permissions(self, delete_check): unauthorized_check = baker.make("checks.Check", agent=unauthorized_agent) policy_check = baker.make("checks.Check", policy=policy) - for method in ["get", "put", "delete"]: + for method in ("get", "put", "delete"): url = f"{base_url}/{check.id}/" unauthorized_url = f"{base_url}/{unauthorized_check.id}/" @@ -1060,7 +1060,7 @@ def test_check_action_permissions(self, nats_cmd): assigned_check=unauthorized_check, ) - for action in ["reset", "run"]: + for action in ("reset", "run"): if action == "reset": url = f"{base_url}/{check_result.id}/{action}/" unauthorized_url = ( diff --git a/api/tacticalrmm/core/management/commands/clear_redis_celery_locks.py b/api/tacticalrmm/core/management/commands/clear_redis_celery_locks.py new file mode 100644 index 0000000000..ece7fc9137 --- /dev/null +++ b/api/tacticalrmm/core/management/commands/clear_redis_celery_locks.py @@ -0,0 +1,22 @@ +from django.core.cache import cache +from django.core.management.base import BaseCommand + +from tacticalrmm.constants import ( + AGENT_OUTAGES_LOCK, + ORPHANED_WIN_TASK_LOCK, + RESOLVE_ALERTS_LOCK, + SYNC_SCHED_TASK_LOCK, +) + + +class Command(BaseCommand): + help = "Clear redis celery locks. Should only be ran while celery/beat is stopped." + + def handle(self, *args, **kwargs): + for key in ( + AGENT_OUTAGES_LOCK, + ORPHANED_WIN_TASK_LOCK, + RESOLVE_ALERTS_LOCK, + SYNC_SCHED_TASK_LOCK, + ): + cache.delete(key) diff --git a/api/tacticalrmm/core/management/commands/get_config.py b/api/tacticalrmm/core/management/commands/get_config.py index 2979561b24..4374228d96 100644 --- a/api/tacticalrmm/core/management/commands/get_config.py +++ b/api/tacticalrmm/core/management/commands/get_config.py @@ -3,6 +3,8 @@ from django.conf import settings from django.core.management.base import BaseCommand +from tacticalrmm.helpers import get_webdomain + class Command(BaseCommand): help = "Get config vars to be used in shell scripts" @@ -25,7 +27,7 @@ def handle(self, *args, **kwargs): case "frontend": self.stdout.write(settings.CORS_ORIGIN_WHITELIST[0]) case "webdomain": - self.stdout.write(urlparse(settings.CORS_ORIGIN_WHITELIST[0]).netloc) + self.stdout.write(get_webdomain()) case "djangoadmin": url = f"https://{settings.ALLOWED_HOSTS[0]}/{settings.ADMIN_URL}" self.stdout.write(url) diff --git a/api/tacticalrmm/core/management/commands/run_all_tasks.py b/api/tacticalrmm/core/management/commands/run_all_tasks.py index f327413eb9..006058ed07 100644 --- a/api/tacticalrmm/core/management/commands/run_all_tasks.py +++ b/api/tacticalrmm/core/management/commands/run_all_tasks.py @@ -6,7 +6,9 @@ from core.tasks import ( cache_db_fields_task, core_maintenance_tasks, - handle_resolved_stuff, + resolve_alerts_task, + resolve_pending_actions, + sync_scheduled_tasks, ) from winupdate.tasks import auto_approve_updates_task, check_agent_update_schedule_task @@ -20,7 +22,9 @@ def handle(self, *args, **kwargs): unsnooze_alerts.delay() cache_db_fields_task.delay() core_maintenance_tasks.delay() - handle_resolved_stuff.delay() + resolve_pending_actions.delay() + resolve_alerts_task.delay() + sync_scheduled_tasks.delay() remove_orphaned_win_tasks.delay() auto_approve_updates_task.delay() check_agent_update_schedule_task.delay() diff --git a/api/tacticalrmm/core/tasks.py b/api/tacticalrmm/core/tasks.py index 0cf51e2578..c5831803b5 100644 --- a/api/tacticalrmm/core/tasks.py +++ b/api/tacticalrmm/core/tasks.py @@ -1,14 +1,16 @@ -from typing import TYPE_CHECKING, Any, Dict +import time +from typing import TYPE_CHECKING, Any from django.conf import settings from django.db.models import Prefetch +from django.utils import timezone as djangotime from packaging import version as pyver from agents.models import Agent from agents.tasks import clear_faults_task, prune_agent_history from alerts.models import Alert from alerts.tasks import prune_resolved_alerts -from autotasks.models import TaskResult +from autotasks.models import AutomatedTask, TaskResult from checks.models import Check, CheckResult from checks.tasks import prune_check_history from clients.models import Client, Site @@ -20,6 +22,8 @@ AGENT_DEFER, AGENT_STATUS_ONLINE, AGENT_STATUS_OVERDUE, + RESOLVE_ALERTS_LOCK, + SYNC_SCHED_TASK_LOCK, AlertSeverity, AlertType, PAAction, @@ -27,6 +31,8 @@ TaskStatus, TaskSyncStatus, ) +from tacticalrmm.helpers import rand_range +from tacticalrmm.utils import DjangoConnectionThreadPoolExecutor, redis_lock if TYPE_CHECKING: from django.db.models import QuerySet @@ -34,6 +40,10 @@ @app.task def core_maintenance_tasks() -> None: + AutomatedTask.objects.filter( + remove_if_not_scheduled=True, expire_date__lt=djangotime.now() + ).delete() + core = get_core_settings() # remove old CheckHistory data @@ -62,16 +72,15 @@ def core_maintenance_tasks() -> None: @app.task -def handle_resolved_stuff() -> None: - +def resolve_pending_actions() -> None: # change agent update pending status to completed if agent has just updated - actions = ( + actions: "QuerySet[PendingAction]" = ( PendingAction.objects.select_related("agent") .defer("agent__services", "agent__wmi_detail") .filter(action_type=PAAction.AGENT_UPDATE, status=PAStatus.PENDING) ) - to_update = [ + to_update: list[int] = [ action.id for action in actions if pyver.parse(action.agent.version) == pyver.parse(settings.LATEST_AGENT_VER) @@ -80,7 +89,9 @@ def handle_resolved_stuff() -> None: PendingAction.objects.filter(pk__in=to_update).update(status=PAStatus.COMPLETED) - agent_queryset = ( + +def _get_agent_qs() -> "QuerySet[Agent]": + qs: "QuerySet[Agent]" = ( Agent.objects.defer(*AGENT_DEFER) .select_related( "site__server_policy", @@ -88,6 +99,7 @@ def handle_resolved_stuff() -> None: "site__client__server_policy", "site__client__workstation_policy", "policy", + "policy__alert_template", "alert_template", ) .prefetch_related( @@ -106,34 +118,83 @@ def handle_resolved_stuff() -> None: "autotasks", ) ) - - for agent in agent_queryset: - if ( - pyver.parse(agent.version) >= pyver.parse("1.6.0") - and agent.status == AGENT_STATUS_ONLINE - ): - # sync scheduled tasks - for task in agent.get_tasks_with_policies(): - if ( - not task.task_result - or task.task_result.sync_status == TaskSyncStatus.INITIAL - ): - task.create_task_on_agent(agent=agent if task.policy else None) - elif task.task_result.sync_status == TaskSyncStatus.PENDING_DELETION: - task.delete_task_on_agent(agent=agent if task.policy else None) - elif task.task_result.sync_status == TaskSyncStatus.NOT_SYNCED: - task.modify_task_on_agent(agent=agent if task.policy else None) - elif task.task_result.sync_status == TaskSyncStatus.SYNCED: - continue - - # handles any alerting actions - if Alert.objects.filter( - alert_type=AlertType.AVAILABILITY, agent=agent, resolved=False - ).exists(): - Alert.handle_alert_resolve(agent) - - -def _get_failing_data(agents: "QuerySet[Any]") -> Dict[str, bool]: + return qs + + +@app.task(bind=True) +def resolve_alerts_task(self) -> str: + with redis_lock(RESOLVE_ALERTS_LOCK, self.app.oid) as acquired: + if not acquired: + return f"{self.app.oid} still running" + + # TODO rework this to not use an agent queryset, use Alerts + for agent in _get_agent_qs(): + if ( + pyver.parse(agent.version) >= pyver.parse("1.6.0") + and agent.status == AGENT_STATUS_ONLINE + ): + # handles any alerting actions + if Alert.objects.filter( + alert_type=AlertType.AVAILABILITY, agent=agent, resolved=False + ).exists(): + Alert.handle_alert_resolve(agent) + + return "completed" + + +@app.task(bind=True) +def sync_scheduled_tasks(self) -> str: + with redis_lock(SYNC_SCHED_TASK_LOCK, self.app.oid) as acquired: + if not acquired: + return f"{self.app.oid} still running" + + task_actions = [] # list of tuples + for agent in _get_agent_qs(): + if ( + pyver.parse(agent.version) >= pyver.parse("1.6.0") + and agent.status == AGENT_STATUS_ONLINE + ): + # create a list of tasks to be synced so we can run them in parallel later with thread pool executor + for task in agent.get_tasks_with_policies(): + agent_obj = agent if task.policy else None + + # policy tasks will be an empty dict on initial + if (not task.task_result) or ( + isinstance(task.task_result, TaskResult) + and task.task_result.sync_status == TaskSyncStatus.INITIAL + ): + task_actions.append(("create", task.id, agent_obj)) + elif ( + isinstance(task.task_result, TaskResult) + and task.task_result.sync_status + == TaskSyncStatus.PENDING_DELETION + ): + task_actions.append(("delete", task.id, agent_obj)) + elif ( + isinstance(task.task_result, TaskResult) + and task.task_result.sync_status == TaskSyncStatus.NOT_SYNCED + ): + task_actions.append(("modify", task.id, agent_obj)) + + def _handle_task(actions: tuple[str, int, Any]) -> None: + time.sleep(rand_range(50, 600)) + task: "AutomatedTask" = AutomatedTask.objects.get(id=actions[1]) + if actions[0] == "create": + task.create_task_on_agent(agent=actions[2]) + elif actions[0] == "modify": + task.modify_task_on_agent(agent=actions[2]) + elif actions[0] == "delete": + task.delete_task_on_agent(agent=actions[2]) + + # TODO this is a janky hack + # Rework this with asyncio. Need to rewrite all sync db operations with django's new async api + with DjangoConnectionThreadPoolExecutor(max_workers=50) as executor: + executor.map(_handle_task, task_actions) + + return "completed" + + +def _get_failing_data(agents: "QuerySet[Agent]") -> dict[str, bool]: data = {"error": False, "warning": False} for agent in agents: if agent.maintenance_mode: @@ -181,32 +242,7 @@ def _get_failing_data(agents: "QuerySet[Any]") -> Dict[str, bool]: @app.task def cache_db_fields_task() -> None: - qs = ( - Agent.objects.defer(*AGENT_DEFER) - .select_related( - "site__server_policy", - "site__workstation_policy", - "site__client__server_policy", - "site__client__workstation_policy", - "policy__alert_template", - "alert_template", - ) - .prefetch_related( - Prefetch( - "agentchecks", - queryset=Check.objects.select_related("script"), - ), - Prefetch( - "checkresults", - queryset=CheckResult.objects.select_related("assigned_check"), - ), - Prefetch( - "taskresults", - queryset=TaskResult.objects.select_related("task"), - ), - "autotasks", - ) - ) + qs = _get_agent_qs() # update client/site failing check fields and agent counts for site in Site.objects.all(): agents = qs.filter(site=site) diff --git a/api/tacticalrmm/core/tests.py b/api/tacticalrmm/core/tests.py index 32bd7ae881..fb0bd029ea 100644 --- a/api/tacticalrmm/core/tests.py +++ b/api/tacticalrmm/core/tests.py @@ -24,7 +24,7 @@ from .consumers import DashInfo from .models import CustomField, GlobalKVStore, URLAction from .serializers import CustomFieldSerializer, KeyStoreSerializer, URLActionSerializer -from .tasks import core_maintenance_tasks, handle_resolved_stuff +from .tasks import core_maintenance_tasks, resolve_pending_actions class TestCodeSign(TacticalTestCase): @@ -422,7 +422,7 @@ def test_resolved_pending_agentupdate_task(self): Agent.objects.update(version=settings.LATEST_AGENT_VER) - handle_resolved_stuff() + resolve_pending_actions() complete = PendingAction.objects.filter( action_type=PAAction.AGENT_UPDATE, status=PAStatus.COMPLETED diff --git a/api/tacticalrmm/core/views.py b/api/tacticalrmm/core/views.py index ff692c1a7d..bd07ff6fac 100644 --- a/api/tacticalrmm/core/views.py +++ b/api/tacticalrmm/core/views.py @@ -130,9 +130,7 @@ def server_maintenance(request): from autotasks.tasks import remove_orphaned_win_tasks remove_orphaned_win_tasks.delay() - return Response( - "The task has been initiated. Check the Debug Log in the UI for progress." - ) + return Response("The task has been initiated.") if request.data["action"] == "prune_db": from logs.models import AuditLog, PendingAction diff --git a/api/tacticalrmm/requirements.txt b/api/tacticalrmm/requirements.txt index 08c2036fa0..02f9f9d328 100644 --- a/api/tacticalrmm/requirements.txt +++ b/api/tacticalrmm/requirements.txt @@ -7,18 +7,17 @@ channels_redis==4.0.0 chardet==4.0.0 cryptography==38.0.4 daphne==4.0.0 -Django==4.1.4 +Django==4.1.5 django-cors-headers==3.13.0 django-ipware==4.0.2 django-rest-knox==4.2.0 djangorestframework==3.14.0 drf-spectacular==0.25.1 -future==0.18.2 -hiredis==2.1.0 +hiredis==2.1.1 meshctrl==0.1.15 msgpack==1.0.4 nats-py==2.2.0 -packaging==22.0 +packaging==23.0 psutil==5.9.4 psycopg2-binary==2.9.5 pycparser==2.21 @@ -27,12 +26,12 @@ pyotp==2.8.0 pyparsing==3.0.9 pytz==2022.5 qrcode==7.3.1 -redis==4.4.0 -requests==2.28.1 +redis==4.3.5 +requests==2.28.2 six==1.16.0 sqlparse==0.4.3 -twilio==7.16.0 -urllib3==1.26.13 +twilio==7.16.1 +urllib3==1.26.14 uWSGI==2.0.21 validators==0.20.0 vine==5.0.0 diff --git a/api/tacticalrmm/scripts/tasks.py b/api/tacticalrmm/scripts/tasks.py index c3707b5b16..105dac62df 100644 --- a/api/tacticalrmm/scripts/tasks.py +++ b/api/tacticalrmm/scripts/tasks.py @@ -1,10 +1,17 @@ import asyncio -from typing import List +from typing import TYPE_CHECKING, List + +import msgpack +import nats from agents.models import Agent, AgentHistory from scripts.models import Script from tacticalrmm.celery import app from tacticalrmm.constants import AgentHistoryType +from tacticalrmm.helpers import setup_nats_options + +if TYPE_CHECKING: + from nats.aio.client import Client as NATSClient @app.task @@ -16,6 +23,8 @@ def handle_bulk_command_task( username, run_as_user: bool = False, ) -> None: + + items = [] nats_data = { "func": "rawcmd", "timeout": timeout, @@ -33,9 +42,26 @@ def handle_bulk_command_task( command=cmd, username=username, ) - nats_data["id"] = hist.pk + tmp = {**nats_data} + tmp["id"] = hist.pk + items.append((agent.agent_id, tmp)) + + async def _run_cmd(nc: "NATSClient", sub, data) -> None: + await nc.publish(subject=sub, payload=msgpack.dumps(data)) + + async def _run() -> None: + opts = setup_nats_options() + try: + nc = await nats.connect(**opts) + except Exception as e: + print(e) + return + + tasks = [_run_cmd(nc=nc, sub=item[0], data=item[1]) for item in items] + await asyncio.gather(*tasks) + await nc.close() - asyncio.run(agent.nats_cmd(nats_data, wait=False)) + asyncio.run(_run()) @app.task diff --git a/api/tacticalrmm/tacticalrmm/auth.py b/api/tacticalrmm/tacticalrmm/auth.py index e5e8615b80..0a8995dd8b 100644 --- a/api/tacticalrmm/tacticalrmm/auth.py +++ b/api/tacticalrmm/tacticalrmm/auth.py @@ -59,6 +59,6 @@ def authenticate_credentials(self, key): # check if token is expired if apikey.expiration and apikey.expiration < djangotime.now(): - raise exceptions.AuthenticationFailed(_("The token as expired.")) + raise exceptions.AuthenticationFailed(_("The token has expired.")) return apikey.user, apikey.key diff --git a/api/tacticalrmm/tacticalrmm/cache.py b/api/tacticalrmm/tacticalrmm/cache.py index f487bbb0a7..077054278d 100644 --- a/api/tacticalrmm/tacticalrmm/cache.py +++ b/api/tacticalrmm/tacticalrmm/cache.py @@ -6,14 +6,14 @@ class TacticalRedisCache(RedisCache): def delete_many_pattern(self, pattern: str, version: Optional[int] = None) -> None: - keys = self._cache.get_client().keys(f":{version if version else 1}:{pattern}") + keys = self._cache.get_client().keys(f":{version or 1}:{pattern}") if keys: self._cache.delete_many(keys) # just for debugging def show_everything(self, version: Optional[int] = None) -> list[bytes]: - return self._cache.get_client().keys(f":{version if version else 1}:*") + return self._cache.get_client().keys(f":{version or 1}:*") class TacticalDummyCache(DummyCache): diff --git a/api/tacticalrmm/tacticalrmm/celery.py b/api/tacticalrmm/tacticalrmm/celery.py index b2590f1f07..0abcc0fe1a 100644 --- a/api/tacticalrmm/tacticalrmm/celery.py +++ b/api/tacticalrmm/tacticalrmm/celery.py @@ -1,6 +1,7 @@ from __future__ import absolute_import, unicode_literals import os +from datetime import timedelta from celery import Celery from celery.schedules import crontab @@ -36,7 +37,7 @@ }, "agent-outages-task": { "task": "agents.tasks.agent_outages_task", - "schedule": crontab(minute="*/2"), + "schedule": timedelta(seconds=150.0), }, "unsnooze-alerts": { "task": "alerts.tasks.unsnooze_alerts", @@ -50,10 +51,18 @@ "task": "core.tasks.cache_db_fields_task", "schedule": crontab(minute="*/3", hour="*"), }, - "handle-resolved-stuff": { - "task": "core.tasks.handle_resolved_stuff", + "sync-scheduled-tasks": { + "task": "core.tasks.sync_scheduled_tasks", "schedule": crontab(minute="*/2", hour="*"), }, + "resolve-pending-actions": { + "task": "core.tasks.resolve_pending_actions", + "schedule": timedelta(seconds=100.0), + }, + "resolve-alerts-task": { + "task": "core.tasks.resolve_alerts_task", + "schedule": timedelta(seconds=80.0), + }, } diff --git a/api/tacticalrmm/tacticalrmm/constants.py b/api/tacticalrmm/tacticalrmm/constants.py index 7316e71c98..141e0e5be0 100644 --- a/api/tacticalrmm/tacticalrmm/constants.py +++ b/api/tacticalrmm/tacticalrmm/constants.py @@ -24,6 +24,12 @@ def __str__(self): AGENT_STATUS_OFFLINE = "offline" AGENT_STATUS_OVERDUE = "overdue" +REDIS_LOCK_EXPIRE = 60 * 60 * 2 # Lock expires in 2 hours +RESOLVE_ALERTS_LOCK = "resolve-alerts-lock-key" +SYNC_SCHED_TASK_LOCK = "sync-sched-tasks-lock-key" +AGENT_OUTAGES_LOCK = "agent-outages-task-lock-key" +ORPHANED_WIN_TASK_LOCK = "orphaned-win-task-lock-key" + class GoArch(models.TextChoices): AMD64 = "amd64", "amd64" diff --git a/api/tacticalrmm/tacticalrmm/helpers.py b/api/tacticalrmm/tacticalrmm/helpers.py index e7054e99a4..d19674e4bf 100644 --- a/api/tacticalrmm/tacticalrmm/helpers.py +++ b/api/tacticalrmm/tacticalrmm/helpers.py @@ -1,6 +1,8 @@ -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any +from urllib.parse import urlparse import pytz +import random from django.conf import settings from django.utils import timezone as djangotime from rest_framework import status @@ -46,3 +48,28 @@ def date_is_in_past(*, datetime_obj: "datetime", agent_tz: str) -> bool: localized = agent_pytz.localize(datetime_obj) utc_time = localized.astimezone(pytz.utc) return now > utc_time + + +def get_webdomain() -> str: + return urlparse(settings.CORS_ORIGIN_WHITELIST[0]).netloc + + +def rand_range(min: int, max: int) -> float: + """ + Input is milliseconds. + Returns float truncated to 2 decimals. + """ + return round(random.uniform(min, max) / 1000, 2) + + +def setup_nats_options() -> dict[str, Any]: + nats_std_port, _ = get_nats_ports() + opts = { + "servers": f"tls://{settings.ALLOWED_HOSTS[0]}:{nats_std_port}", + "user": "tacticalrmm", + "name": "trmm-django", + "password": settings.SECRET_KEY, + "connect_timeout": 3, + "max_reconnect_attempts": 2, + } + return opts diff --git a/api/tacticalrmm/tacticalrmm/settings.py b/api/tacticalrmm/tacticalrmm/settings.py index 1b5f612f41..78c7183259 100644 --- a/api/tacticalrmm/tacticalrmm/settings.py +++ b/api/tacticalrmm/tacticalrmm/settings.py @@ -20,21 +20,21 @@ AUTH_USER_MODEL = "accounts.User" # latest release -TRMM_VERSION = "0.15.6" +TRMM_VERSION = "0.15.7" # https://github.com/amidaware/tacticalrmm-web -WEB_VERSION = "0.101.11" +WEB_VERSION = "0.101.13" # bump this version everytime vue code is changed # to alert user they need to manually refresh their browser -APP_VER = "0.0.176" +APP_VER = "0.0.177" # https://github.com/amidaware/rmmagent LATEST_AGENT_VER = "2.4.4" -MESH_VER = "1.1.1" +MESH_VER = "1.1.2" -NATS_SERVER_VER = "2.9.10" +NATS_SERVER_VER = "2.9.11" # for the update script, bump when need to recreate venv PIP_VER = "34" diff --git a/api/tacticalrmm/tacticalrmm/utils.py b/api/tacticalrmm/tacticalrmm/utils.py index a9149ff806..64c9f231b8 100644 --- a/api/tacticalrmm/tacticalrmm/utils.py +++ b/api/tacticalrmm/tacticalrmm/utils.py @@ -3,6 +3,9 @@ import subprocess import tempfile import time +from concurrent.futures import ThreadPoolExecutor +from contextlib import contextmanager +from functools import wraps from typing import List, Optional, Union import pytz @@ -11,6 +14,8 @@ from channels.db import database_sync_to_async from django.conf import settings from django.contrib.auth.models import AnonymousUser +from django.core.cache import cache +from django.db import connection from django.http import FileResponse from knox.auth import TokenAuthentication from rest_framework.response import Response @@ -21,6 +26,7 @@ from tacticalrmm.constants import ( MONTH_DAYS, MONTHS, + REDIS_LOCK_EXPIRE, WEEK_DAYS, WEEKS, AgentPlat, @@ -338,7 +344,11 @@ def replace_db_values( # check if attr exists and isn't a function if hasattr(obj, temp[1]) and not callable(getattr(obj, temp[1])): - value = f"'{getattr(obj, temp[1])}'" if quotes else getattr(obj, temp[1]) + temp1 = getattr(obj, temp[1]) + if shell == ScriptShell.POWERSHELL and isinstance(temp1, str) and "'" in temp1: + temp1 = temp1.replace("'", "''") + + value = f"'{temp1}'" if quotes else temp1 elif CustomField.objects.filter(model=model, name=temp[1]).exists(): @@ -368,6 +378,13 @@ def replace_db_values( elif value is not None and field.type == CustomFieldType.CHECKBOX: value = format_shell_bool(value, shell) else: + if ( + shell == ScriptShell.POWERSHELL + and isinstance(value, str) + and "'" in value + ): + value = value.replace("'", "''") + value = f"'{value}'" if quotes else value else: @@ -401,3 +418,54 @@ def format_shell_bool(value: bool, shell: Optional[str]) -> str: return "$True" if value else "$False" return "1" if value else "0" + + +# https://docs.celeryq.dev/en/latest/tutorials/task-cookbook.html#cookbook-task-serial +@contextmanager +def redis_lock(lock_id, oid): + timeout_at = time.monotonic() + REDIS_LOCK_EXPIRE - 3 + status = cache.add(lock_id, oid, REDIS_LOCK_EXPIRE) + try: + yield status + finally: + if time.monotonic() < timeout_at and status: + cache.delete(lock_id) + + +# https://stackoverflow.com/a/57794016 +class DjangoConnectionThreadPoolExecutor(ThreadPoolExecutor): + """ + When a function is passed into the ThreadPoolExecutor via either submit() or map(), + this will wrap the function, and make sure that close_django_db_connection() is called + inside the thread when it's finished so Django doesn't leak DB connections. + + Since map() calls submit(), only submit() needs to be overwritten. + """ + + def close_django_db_connection(self): + connection.close() + + def generate_thread_closing_wrapper(self, fn): + @wraps(fn) + def new_func(*args, **kwargs): + try: + return fn(*args, **kwargs) + finally: + self.close_django_db_connection() + + return new_func + + def submit(*args, **kwargs): + if len(args) >= 2: + self, fn, *args = args + fn = self.generate_thread_closing_wrapper(fn=fn) + elif not args: + raise TypeError( + "descriptor 'submit' of 'ThreadPoolExecutor' object " + "needs an argument" + ) + elif "fn" in kwargs: + fn = self.generate_thread_closing_wrapper(fn=kwargs.pop("fn")) + self, *args = args + + return super(self.__class__, self).submit(fn, *args, **kwargs) diff --git a/api/tacticalrmm/winupdate/views.py b/api/tacticalrmm/winupdate/views.py index 2305e34671..42aa70101c 100644 --- a/api/tacticalrmm/winupdate/views.py +++ b/api/tacticalrmm/winupdate/views.py @@ -39,7 +39,7 @@ def post(self, request, agent_id): agent.delete_superseded_updates() asyncio.run(agent.nats_cmd({"func": "getwinupdates"}, wait=False)) - return Response(f"A Windows update scan will performed on {agent.hostname}") + return Response(f"A Windows update scan will be performed on {agent.hostname}") class InstallWindowsUpdates(APIView): diff --git a/docker/containers/tactical-meshcentral/entrypoint.sh b/docker/containers/tactical-meshcentral/entrypoint.sh index f5b9c25b8a..795b03ed48 100644 --- a/docker/containers/tactical-meshcentral/entrypoint.sh +++ b/docker/containers/tactical-meshcentral/entrypoint.sh @@ -10,7 +10,9 @@ set -e : "${MONGODB_PORT:=27017}" : "${NGINX_HOST_IP:=172.20.0.20}" : "${NGINX_HOST_PORT:=4443}" +: "${MESH_COMPRESSION_ENABLED:=true}" : "${MESH_PERSISTENT_CONFIG:=0}" +: "${MESH_WEBRTC_ENABLED:=false}" : "${WS_MASK_OVERRIDE:=0}" : "${SMTP_HOST:=smtp.example.com}" : "${SMTP_PORT:=587}" @@ -41,9 +43,10 @@ if [ ! -f "/home/node/app/meshcentral-data/config.json" ] || [[ "${MESH_PERSISTE "agentPong": 300, "allowHighQualityDesktop": true, "agentCoreDump": false, - "compression": true, - "wsCompression": true, - "agentWsCompression": true, + "compression": ${MESH_COMPRESSION_ENABLED}, + "wsCompression": ${MESH_COMPRESSION_ENABLED}, + "agentWsCompression": ${MESH_COMPRESSION_ENABLED}, + "webRTC": ${MESH_WEBRTC_ENABLED}, "maxInvalidLogin": { "time": 5, "count": 5, diff --git a/docker/containers/tactical-nats/dockerfile b/docker/containers/tactical-nats/dockerfile index eb84c6408d..1d66785c45 100644 --- a/docker/containers/tactical-nats/dockerfile +++ b/docker/containers/tactical-nats/dockerfile @@ -1,4 +1,4 @@ -FROM nats:2.9.10-alpine +FROM nats:2.9.11-alpine ENV TACTICAL_DIR /opt/tactical ENV TACTICAL_READY_FILE ${TACTICAL_DIR}/tmp/tactical.ready diff --git a/docker/containers/tactical/entrypoint.sh b/docker/containers/tactical/entrypoint.sh index 4bc830e30a..d796a12921 100644 --- a/docker/containers/tactical/entrypoint.sh +++ b/docker/containers/tactical/entrypoint.sh @@ -123,6 +123,7 @@ EOF python manage.py create_natsapi_conf python manage.py create_uwsgi_conf python manage.py create_installer_user + python manage.py clear_redis_celery_locks python manage.py post_update_tasks # create super user @@ -148,7 +149,7 @@ fi if [ "$1" = 'tactical-celery' ]; then check_tactical_ready - celery -A tacticalrmm worker -l info + celery -A tacticalrmm worker --autoscale=30,5 -l info fi if [ "$1" = 'tactical-celerybeat' ]; then diff --git a/restore.sh b/restore.sh index e0fd3aa072..872e8f79ac 100755 --- a/restore.sh +++ b/restore.sh @@ -1,6 +1,6 @@ #!/usr/bin/env bash -SCRIPT_VERSION="45" +SCRIPT_VERSION="46" SCRIPT_URL='https://raw.githubusercontent.com/amidaware/tacticalrmm/master/restore.sh' sudo apt update @@ -294,7 +294,6 @@ npm install meshcentral@${MESH_VER} print_green 'Restoring the backend' cp $tmp_dir/rmm/local_settings.py /rmm/api/tacticalrmm/tacticalrmm/ -cp $tmp_dir/rmm/env /rmm/web/.env gzip -d $tmp_dir/rmm/debug.log.gz cp $tmp_dir/rmm/django_debug.log /rmm/api/tacticalrmm/tacticalrmm/private/log/ diff --git a/update.sh b/update.sh index 6c1ca29776..098d1c6bef 100644 --- a/update.sh +++ b/update.sh @@ -1,6 +1,6 @@ #!/usr/bin/env bash -SCRIPT_VERSION="140" +SCRIPT_VERSION="141" SCRIPT_URL='https://raw.githubusercontent.com/amidaware/tacticalrmm/master/update.sh' LATEST_SETTINGS_URL='https://raw.githubusercontent.com/amidaware/tacticalrmm/master/api/tacticalrmm/tacticalrmm/settings.py' YELLOW='\033[1;33m' @@ -121,7 +121,13 @@ if ! [[ $CHECK_NATS_WEBSOCKET ]]; then fi -for i in nginx nats-api nats rmm daphne celery celerybeat +printf >&2 "${GREEN}Stopping celery and celerybeat services (this might take a while)...${NC}\n" +for i in celerybeat celery +do +sudo systemctl stop ${i} +done + +for i in nginx nats-api nats rmm daphne do printf >&2 "${GREEN}Stopping ${i} service...${NC}\n" sudo systemctl stop ${i} @@ -346,6 +352,7 @@ python manage.py load_chocos python manage.py create_installer_user python manage.py create_natsapi_conf python manage.py create_uwsgi_conf +python manage.py clear_redis_celery_locks python manage.py post_update_tasks API=$(python manage.py get_config api) WEB_VERSION=$(python manage.py get_config webversion)