From 96fddc77e1ea8d639483f4b2c6b50606d73c6bcc Mon Sep 17 00:00:00 2001 From: Tommy Beadle Date: Mon, 1 Apr 2024 14:11:03 -0400 Subject: [PATCH] Overhaul the scheduler. - Improve the handling of tasks, machines, and how they are updated in the database. Use transactions and locks appropriately so that changes are more atomic. - Remove the need for holding the machine_lock for long periods of time in the main loop and remove the need for batch scheduling tasks. - Make the code a lot cleaner and readable, including separation of concerns among various classes. Introduce a MachineryManager class that does what the name suggests. In the future, this could have an API added that could provide us a way to dynamically update machines in the database without having to update a conf file and restart cuckoo.py. --- conf/default/cuckoo.conf.default | 5 - cuckoo.py | 2 +- lib/cuckoo/common/abstracts.py | 145 +++-- lib/cuckoo/common/exceptions.py | 11 + lib/cuckoo/common/utils.py | 8 +- lib/cuckoo/common/web_utils.py | 9 +- lib/cuckoo/core/analysis_manager.py | 584 ++++++++++---------- lib/cuckoo/core/database.py | 278 ++++------ lib/cuckoo/core/guest.py | 9 +- lib/cuckoo/core/machinery_manager.py | 307 +++++++++++ lib/cuckoo/core/scheduler.py | 768 ++++++++++----------------- modules/machinery/aws.py | 14 +- modules/machinery/az.py | 32 +- modules/machinery/esx.py | 2 + modules/machinery/kvm.py | 2 + modules/machinery/multi.py | 2 + modules/machinery/physical.py | 2 + modules/machinery/proxmox.py | 4 +- modules/machinery/qemu.py | 4 +- modules/machinery/virtualbox.py | 2 + modules/machinery/vmware.py | 2 + modules/machinery/vmwarerest.py | 2 + modules/machinery/vmwareserver.py | 2 + modules/machinery/vsphere.py | 8 +- utils/process.py | 4 + web/apiv2/views.py | 4 +- 26 files changed, 1111 insertions(+), 1101 deletions(-) create mode 100644 lib/cuckoo/core/machinery_manager.py diff --git a/conf/default/cuckoo.conf.default b/conf/default/cuckoo.conf.default index f8e26b023cb..4ea6cac1dd4 100644 --- a/conf/default/cuckoo.conf.default +++ b/conf/default/cuckoo.conf.default @@ -32,11 +32,6 @@ machinery_screenshots = off scaling_semaphore = off # A configurable wait time between updating the limit value of the scaling bounded semaphore scaling_semaphore_update_timer = 10 -# Allow more than one task scheduled to be assigned at once for better scaling -# A switch to allow batch task assignment, a method that can more efficiently assign tasks to available machines -batch_scheduling = off -# The maximum number of tasks assigned to machines per batch, optimal value dependent on deployment -max_batch_count = 20 # Enable creation of memory dump of the analysis machine before shutting # down. Even if turned off, this functionality can also be enabled at diff --git a/cuckoo.py b/cuckoo.py index 90662238fd8..5ecc4017d6b 100644 --- a/cuckoo.py +++ b/cuckoo.py @@ -117,7 +117,7 @@ def cuckoo_main(max_analysis_count=0): parser.add_argument("-v", "--version", action="version", version="You are running Cuckoo Sandbox {0}".format(CUCKOO_VERSION)) parser.add_argument("-a", "--artwork", help="Show artwork", action="store_true", required=False) parser.add_argument("-t", "--test", help="Test startup", action="store_true", required=False) - parser.add_argument("-m", "--max-analysis-count", help="Maximum number of analyses", type=int, required=False) + parser.add_argument("-m", "--max-analysis-count", help="Maximum number of analyses", type=int, required=False, default=0) parser.add_argument( "-s", "--stop", diff --git a/lib/cuckoo/common/abstracts.py b/lib/cuckoo/common/abstracts.py index 0e3a4dee93a..7f208babc39 100644 --- a/lib/cuckoo/common/abstracts.py +++ b/lib/cuckoo/common/abstracts.py @@ -4,7 +4,6 @@ # See the file 'docs/LICENSE' for copying permission. import datetime -import inspect import io import logging import os @@ -38,7 +37,7 @@ from lib.cuckoo.common.path_utils import path_exists, path_mkdir from lib.cuckoo.common.url_validate import url as url_validator from lib.cuckoo.common.utils import create_folder, get_memdump_path, load_categories -from lib.cuckoo.core.database import Database +from lib.cuckoo.core.database import Database, Machine, _Database try: import re2 as re @@ -107,42 +106,48 @@ class Machinery: # Default label used in machinery configuration file to supply virtual # machine name/label/vmx path. Override it if you dubbed it in another # way. - LABEL = "label" + LABEL: str = "label" + + # This must be defined in sub-classes. + module_name: str def __init__(self): - self.module_name = "" self.options = None # Database pointer. - self.db = Database() - # Machine table is cleaned to be filled from configuration file - # at each start. - self.db.clean_machines() + self.db: _Database = Database() + + # Find its configuration file. + conf = os.path.join(CUCKOO_ROOT, "conf", f"{self.module_name}.conf") + if not path_exists(conf): + raise CuckooCriticalError( + f'The configuration file for machine manager "{self.module_name}" does not exist at path: {conf}' + ) + self.set_options(Config(self.module_name)) - def set_options(self, options: dict): + def set_options(self, options: dict) -> None: """Set machine manager options. @param options: machine manager options dict. """ self.options = options + mmanager_opts = self.options.get(self.module_name) + if not isinstance(mmanager_opts["machines"], list): + mmanager_opts["machines"] = str(mmanager_opts["machines"]).strip().split(",") + + def initialize(self) -> None: + """Read, load, and verify machines configuration.""" + # Machine table is cleaned to be filled from configuration file + # at each start. + self.db.clean_machines() - def initialize(self, module_name): - """Read, load, and verify machines configuration. - @param module_name: module name. - """ # Load. - self._initialize(module_name) + self._initialize() # Run initialization checks. self._initialize_check() - def _initialize(self, module_name): - """Read configuration. - @param module_name: module name. - """ - self.module_name = module_name - mmanager_opts = self.options.get(module_name) - if not isinstance(mmanager_opts["machines"], list): - mmanager_opts["machines"] = str(mmanager_opts["machines"]).strip().split(",") - + def _initialize(self) -> None: + """Read configuration.""" + mmanager_opts = self.options.get(self.module_name) for machine_id in mmanager_opts["machines"]: try: machine_opts = self.options.get(machine_id.strip()) @@ -198,7 +203,7 @@ def _initialize(self, module_name): log.warning("Configuration details about machine %s are missing: %s", machine_id.strip(), e) continue - def _initialize_check(self): + def _initialize_check(self) -> None: """Runs checks against virtualization software when a machine manager is initialized. @note: in machine manager modules you may override or superclass his method. @raise CuckooMachineError: if a misconfiguration or a unkown vm state is found. @@ -208,20 +213,24 @@ def _initialize_check(self): except NotImplementedError: return + self.shutdown_running_machines(configured_vms) + self.check_screenshot_support() + + if not cfg.timeouts.vm_state: + raise CuckooCriticalError("Virtual machine state change timeout setting not found, please add it to the config file") + + def check_screenshot_support(self) -> None: # If machinery_screenshots are enabled, check the machinery supports it. - if cfg.cuckoo.machinery_screenshots: - # inspect function members available on the machinery class - cls_members = inspect.getmembers(self.__class__, predicate=inspect.isfunction) - for name, function in cls_members: - if name != Machinery.screenshot.__name__: - continue - if Machinery.screenshot == function: - msg = f"machinery {self.module_name} does not support machinery screenshots" - raise CuckooCriticalError(msg) - break - else: - raise NotImplementedError(f"missing machinery method: {Machinery.screenshot.__name__}") + if not cfg.cuckoo.machinery_screenshots: + return + + # inspect function members available on the machinery class + func = getattr(self.__class__, "screenshot") + if func == Machinery.screenshot: + msg = f"machinery {self.module_name} does not support machinery screenshots" + raise CuckooCriticalError(msg) + def shutdown_running_machines(self, configured_vms: List[str]) -> None: for machine in self.machines(): # If this machine is already in the "correct" state, then we # go on to the next machine. @@ -236,16 +245,13 @@ def _initialize_check(self): msg = f"Please update your configuration. Unable to shut '{machine.label}' down or find the machine in its proper state: {e}" raise CuckooCriticalError(msg) from e - if not cfg.timeouts.vm_state: - raise CuckooCriticalError("Virtual machine state change timeout setting not found, please add it to the config file") - def machines(self): """List virtual machines. @return: virtual machines list """ return self.db.list_machines(include_reserved=True) - def availables(self, label=None, platform=None, tags=None, arch=None, include_reserved=False, os_version=[]): + def availables(self, label=None, platform=None, tags=None, arch=None, include_reserved=False, os_version=None): """How many (relevant) machines are free. @param label: machine ID. @param platform: machine platform. @@ -257,32 +263,15 @@ def availables(self, label=None, platform=None, tags=None, arch=None, include_re label=label, platform=platform, tags=tags, arch=arch, include_reserved=include_reserved, os_version=os_version ) - def acquire(self, machine_id=None, platform=None, tags=None, arch=None, os_version=[], need_scheduled=False): - """Acquire a machine to start analysis. - @param machine_id: machine ID. - @param platform: machine platform. - @param tags: machine tags - @param arch: machine arch - @param os_version: tags to filter per OS version. Ex: winxp, win7, win10, win11 - @param need_scheduled: should the result be filtered on 'scheduled' machine status - @return: machine or None. - """ - if machine_id: - return self.db.lock_machine(label=machine_id, need_scheduled=need_scheduled) - elif platform: - return self.db.lock_machine( - platform=platform, tags=tags, arch=arch, os_version=os_version, need_scheduled=need_scheduled - ) - return self.db.lock_machine(tags=tags, arch=arch, os_version=os_version, need_scheduled=need_scheduled) - - def get_machines_scheduled(self): - return self.db.get_machines_scheduled() + def scale_pool(self, machine: Machine) -> None: + """This can be overridden in sub-classes to scale the pool of machines once one has been acquired.""" + return - def release(self, label=None): + def release(self, machine: Machine) -> Machine: """Release a machine. @param label: machine name. """ - self.db.unlock_machine(label) + return self.db.unlock_machine(machine) def running(self): """Returns running virtual machines. @@ -290,6 +279,9 @@ def running(self): """ return self.db.list_machines(locked=True) + def running_count(self): + return self.db.count_machines_running() + def screenshot(self, label, path): """Screenshot a running virtual machine. @param label: machine name @@ -302,9 +294,10 @@ def shutdown(self): """Shutdown the machine manager. Kills all alive machines. @raise CuckooMachineError: if unable to stop machine. """ - if len(self.running()) > 0: - log.info("Still %d guests still alive, shutting down...", len(self.running())) - for machine in self.running(): + running = self.running() + if len(running) > 0: + log.info("Still %d guests still alive, shutting down...", len(running)) + for machine in running: try: self.stop(machine.label) except CuckooMachineError as e: @@ -389,23 +382,12 @@ class LibVirtMachinery(Machinery): ABORTED = "abort" def __init__(self): - - if not categories_need_VM: - return - if not HAVE_LIBVIRT: raise CuckooDependencyError( "Unable to import libvirt. Ensure that you properly installed it by running: cd /opt/CAPEv2/ ; sudo -u cape poetry run extra/libvirt_installer.sh" ) - super(LibVirtMachinery, self).__init__() - - def initialize(self, module): - """Initialize machine manager module. Override default to set proper - connection string. - @param module: machine manager module - """ - super(LibVirtMachinery, self).initialize(module) + super().__init__() def _initialize_check(self): """Runs all checks when a machine manager is initialized. @@ -420,7 +402,7 @@ def _initialize_check(self): # Base checks. Also attempts to shutdown any machines which are # currently still active. - super(LibVirtMachinery, self)._initialize_check() + super()._initialize_check() def start(self, label): """Starts a virtual machine. @@ -429,14 +411,17 @@ def start(self, label): """ log.debug("Starting machine %s", label) + vm_info = self.db.view_machine_by_label(label) + if vm_info is None: + msg = f"Unable to find machine with label {label} in database." + raise CuckooMachineError(msg) + if self._status(label) != self.POWEROFF: msg = f"Trying to start a virtual machine that has not been turned off {label}" raise CuckooMachineError(msg) conn = self._connect(label) - vm_info = self.db.view_machine_by_label(label) - snapshot_list = self.vms[label].snapshotListNames(flags=0) # If a snapshot is configured try to use it. diff --git a/lib/cuckoo/common/exceptions.py b/lib/cuckoo/common/exceptions.py index 57986b682bb..1e04be88912 100644 --- a/lib/cuckoo/common/exceptions.py +++ b/lib/cuckoo/common/exceptions.py @@ -15,6 +15,11 @@ class CuckooStartupError(CuckooCriticalError): pass +class CuckooDatabaseInitializationError(CuckooCriticalError): + def __str__(self): + return "The database has not been initialized yet. You must call init_database before attempting to use it." + + class CuckooDatabaseError(CuckooCriticalError): """Cuckoo database error.""" @@ -33,6 +38,12 @@ class CuckooOperationalError(Exception): pass +class CuckooUnserviceableTaskError(CuckooOperationalError): + """There are no machines in the pool that can service the task.""" + + pass + + class CuckooMachineError(CuckooOperationalError): """Error managing analysis machine.""" diff --git a/lib/cuckoo/common/utils.py b/lib/cuckoo/common/utils.py index bbf91f5de1e..4945719e886 100644 --- a/lib/cuckoo/common/utils.py +++ b/lib/cuckoo/common/utils.py @@ -19,7 +19,7 @@ import zipfile from datetime import datetime from io import BytesIO -from typing import Tuple, Union +from typing import Final, List, Tuple, Union from data.family_detection_names import family_detection_names from lib.cuckoo.common import utils_dicts @@ -89,10 +89,12 @@ def arg_name_clscontext(arg_val): sanitize_len = config.cuckoo.get("sanitize_len", 32) sanitize_to_len = config.cuckoo.get("sanitize_to_len", 24) +CATEGORIES_NEEDING_VM: Final[Tuple[str]] = ("file", "url") -def load_categories(): + +def load_categories() -> Tuple[List[str], bool]: analyzing_categories = [category.strip() for category in config.cuckoo.categories.split(",")] - needs_VM = any([category in analyzing_categories for category in ("file", "url")]) + needs_VM = any(category in analyzing_categories for category in CATEGORIES_NEEDING_VM) return analyzing_categories, needs_VM diff --git a/lib/cuckoo/common/web_utils.py b/lib/cuckoo/common/web_utils.py index f7bfa24bcc2..938b257e2fe 100644 --- a/lib/cuckoo/common/web_utils.py +++ b/lib/cuckoo/common/web_utils.py @@ -413,12 +413,14 @@ def statistics(s_days: int) -> dict: details[module_name.split(".")[-1]].setdefault(name, entry) top_samples = {} - session = db.Session() added_tasks = ( - session.query(Task).join(Sample, Task.sample_id == Sample.id).filter(Task.added_on.between(date_since, date_till)).all() + db.session.query(Task).join(Sample, Task.sample_id == Sample.id).filter(Task.added_on.between(date_since, date_till)).all() ) tasks = ( - session.query(Task).join(Sample, Task.sample_id == Sample.id).filter(Task.completed_on.between(date_since, date_till)).all() + db.session.query(Task) + .join(Sample, Task.sample_id == Sample.id) + .filter(Task.completed_on.between(date_since, date_till)) + .all() ) details["total"] = len(tasks) details["average"] = f"{round(details['total'] / s_days, 2):.2f}" @@ -487,7 +489,6 @@ def statistics(s_days: int) -> dict: details["detections"] = top_detections(date_since=date_since) details["asns"] = top_asn(date_since=date_since) - session.close() return details diff --git a/lib/cuckoo/core/analysis_manager.py b/lib/cuckoo/core/analysis_manager.py index 9c79f3f78bc..1f9a0cdc608 100644 --- a/lib/cuckoo/core/analysis_manager.py +++ b/lib/cuckoo/core/analysis_manager.py @@ -1,8 +1,11 @@ +import contextlib +import functools import logging import os +import queue import shutil import threading -import time +from typing import Any, Callable, Generator, MutableMapping, Optional, Tuple from lib.cuckoo.common.config import Config from lib.cuckoo.common.constants import CUCKOO_ROOT @@ -17,14 +20,12 @@ from lib.cuckoo.common.objects import File from lib.cuckoo.common.path_utils import path_delete, path_exists, path_mkdir from lib.cuckoo.common.utils import convert_to_printable, create_folder, free_space_monitor, get_memdump_path -from lib.cuckoo.core import scheduler -from lib.cuckoo.core.database import TASK_COMPLETED, Database +from lib.cuckoo.core.database import TASK_COMPLETED, TASK_PENDING, TASK_RUNNING, Database, Guest, Machine, Task, _Database from lib.cuckoo.core.guest import GuestManager -from lib.cuckoo.core.log import task_log_stop +from lib.cuckoo.core.machinery_manager import MachineryManager from lib.cuckoo.core.plugins import RunAuxiliary from lib.cuckoo.core.resultserver import ResultServer from lib.cuckoo.core.rooter import _load_socks5_operational, rooter, vpns -from lib.cuckoo.core.scheduler import CuckooDeadMachine log = logging.getLogger(__name__) @@ -38,13 +39,49 @@ except ImportError: print("Missed dependency: pip3 install psutil") -web_cfg = Config("web") -enable_trim = int(web_cfg.general.enable_trim) -expose_vnc_port = web_cfg.guacamole.enabled -routing = Config("routing") latest_symlink_lock = threading.Lock() +class CuckooDeadMachine(Exception): + """Exception thrown when a machine turns dead. + + When this exception has been thrown, the analysis task will start again, + and will try to use another machine, when available. + """ + + def __init__(self, machine_name: str): + super().__init__() + self.machine_name = machine_name + + def __str__(self) -> str: + return f"{self.machine_name} is dead!" + + +def main_thread_only(func): + # Since most methods of the AnalysisManager class will be called within a child + # thread, let's decorate ones that must only be called from the main thread so + # that it's easy to differentiate between them. + @functools.wraps(func) + def inner(*args, **kwargs): + if threading.current_thread() is not threading.main_thread(): + raise AssertionError(f"{func.__name__} must only be called from the main thread") + return func(*args, **kwargs) + + return inner + + +class AnalysisLogger(logging.LoggerAdapter): + """This class will be used by AnalysisManager so that all of its log entries + will include the task ID, without having to explicitly include it in the log message. + """ + + def process(self, msg: str, kwargs: MutableMapping[str, Any]) -> Tuple[str, MutableMapping[str, Any]]: + task_id: Optional[int] = self.extra.get("task_id") if self.extra is not None else None + if task_id is not None: + msg = f"Task #{task_id}: {msg}" + return msg, kwargs + + class AnalysisManager(threading.Thread): """Analysis Manager. @@ -54,19 +91,30 @@ class AnalysisManager(threading.Thread): complete the analysis and store, process and report its results. """ - def __init__(self, task, error_queue): + def __init__( + self, + task: Task, + machine: Optional[Machine], + machinery_manager: Optional[MachineryManager], + error_queue: queue.Queue, + done_callback: Optional[Callable[["AnalysisManager"], None]] = None, + ): """@param task: task object containing the details for the analysis.""" - threading.Thread.__init__(self) + super().__init__(name=f"task-{task.id}", daemon=True) + self.db: _Database = Database() self.task = task - self.errors = error_queue + self.log = AnalysisLogger(log, {"task_id": self.task.id}) + self.machine = machine + self.machinery_manager = machinery_manager + self.error_queue = error_queue + self.done_callback = done_callback + self.guest: Optional[Guest] = None self.cfg = Config() self.aux_cfg = Config("auxiliary") self.storage = "" self.screenshot_path = "" self.num_screenshots = 0 self.binary = "" - self.machine = None - self.db = Database() self.interface = None self.rt_table = None self.route = None @@ -74,6 +122,23 @@ def __init__(self, task, error_queue): self.reject_segments = None self.reject_hostports = None + @main_thread_only + def prepare_task_and_machine_to_start(self) -> None: + """If the task doesn't use a machine, just set its state to running. + Otherwise, update the task and machine in the database so that the + task is running, the machine is locked and assigned to the task, and + create a Guest row for the analysis. + """ + self.db.set_task_status(self.task, TASK_RUNNING) + if self.machine and self.machinery_manager: + self.db.assign_machine_to_task(self.task, self.machine) + self.db.lock_machine(self.machine) + self.guest = self.db.create_guest( + self.machine, + self.machinery_manager.machinery.__class__.__name__, + self.task, + ) + def init_storage(self): """Initialize analysis storage folder.""" self.storage = os.path.join(CUCKOO_ROOT, "storage", "analyses", str(self.task.id)) @@ -82,7 +147,7 @@ def init_storage(self): # If the analysis storage folder already exists, we need to abort the # analysis or previous results will be overwritten and lost. if path_exists(self.storage): - log.error("Task #%s: Analysis results folder already exists at path '%s', analysis aborted", self.task.id, self.storage) + self.log.error("Analysis results folder already exists at path '%s', analysis aborted", self.storage) return False # If we're not able to create the analysis storage folder, we have to @@ -90,7 +155,7 @@ def init_storage(self): try: create_folder(folder=self.storage) except CuckooOperationalError: - log.error("Task #%s: Unable to create analysis folder %s", self.task.id, self.storage) + self.log.error("Unable to create analysis folder %s", self.storage) return False return True @@ -100,11 +165,7 @@ def check_file(self, sha256): sample = self.db.view_sample(self.task.sample_id) if sample and sha256 != sample.sha256: - log.error( - "Task #%s: Target file has been modified after submission: '%s'", - self.task.id, - convert_to_printable(self.task.target), - ) + self.log.error("Target file has been modified after submission: '%s'", convert_to_printable(self.task.target)) return False return True @@ -112,25 +173,22 @@ def check_file(self, sha256): def store_file(self, sha256): """Store a copy of the file being analyzed.""" if not path_exists(self.task.target): - log.error( - "Task #%s: The file to analyze does not exist at path '%s', analysis aborted", - self.task.id, - convert_to_printable(self.task.target), + self.log.error( + "The file to analyze does not exist at path '%s', analysis aborted", convert_to_printable(self.task.target) ) return False self.binary = os.path.join(CUCKOO_ROOT, "storage", "binaries", sha256) if path_exists(self.binary): - log.info("Task #%s: File already exists at '%s'", self.task.id, self.binary) + self.log.info("File already exists at '%s'", self.binary) else: # TODO: do we really need to abort the analysis in case we are not able to store a copy of the file? try: shutil.copy(self.task.target, self.binary) except (IOError, shutil.Error): - log.error( - "Task #%s: Unable to store file from '%s' to '%s', analysis aborted", - self.task.id, + self.log.error( + "Unable to store file from '%s' to '%s', analysis aborted", self.task.target, self.binary, ) @@ -143,94 +201,26 @@ def store_file(self, sha256): else: shutil.copy(self.binary, new_binary_path) except (AttributeError, OSError) as e: - log.error("Task #%s: Unable to create symlink/copy from '%s' to '%s': %s", self.task.id, self.binary, self.storage, e) + self.log.error("Unable to create symlink/copy from '%s' to '%s': %s", self.binary, self.storage, e) return True def screenshot_machine(self): if not self.cfg.cuckoo.machinery_screenshots: return - if self.machine is None: - log.error("Task #%s: screenshot not possible, no machine acquired yet", self.task.id) + if self.machinery_manager is None or self.machine is None: + self.log.error("screenshot not possible, no machine acquired yet") return # same format and filename approach here as VM-based screenshots self.num_screenshots += 1 screenshot_filename = f"{str(self.num_screenshots).rjust(4, '0')}.jpg" screenshot_path = os.path.join(self.screenshot_path, screenshot_filename) - scheduler.machinery.screenshot(self.machine.label, screenshot_path) - - def acquire_machine(self): - """Acquire an analysis machine from the pool of available ones.""" - machine = None - orphan = False - # Start a loop to acquire a machine to run the analysis on. - while True: - scheduler.machine_lock.acquire() - - # If the user specified a specific machine ID, a platform to be - # used or machine tags acquire the machine accordingly. - task_archs, task_tags = self.db._task_arch_tags_helper(self.task) - os_version = self.db._package_vm_requires_check(self.task.package) - - # In some cases it's possible that we enter this loop without having any available machines. We should make sure this is not - # such case, or the analysis task will fail completely. - if not scheduler.machinery.availables( - label=self.task.machine, platform=self.task.platform, tags=task_tags, arch=task_archs, os_version=os_version - ): - scheduler.machine_lock.release() - log.debug( - "Task #%s: no machine available yet for machine '%s', platform '%s' or tags '%s'.", - self.task.id, - self.task.machine, - self.task.platform, - self.task.tags, - ) - time.sleep(1) - continue - if self.cfg.cuckoo.batch_scheduling and not orphan: - machine = scheduler.machinery.acquire( - machine_id=self.task.machine, - platform=self.task.platform, - tags=task_tags, - arch=task_archs, - os_version=os_version, - need_scheduled=True, - ) - else: - machine = scheduler.machinery.acquire( - machine_id=self.task.machine, - platform=self.task.platform, - tags=task_tags, - arch=task_archs, - os_version=os_version, - need_scheduled=True, - ) - - # If no machine is available at this moment, wait for one second and try again. - if not machine: - scheduler.machine_lock.release() - log.debug( - "Task #%s: no machine available yet for machine '%s', platform '%s' or tags '%s'.", - self.task.id, - self.task.machine, - self.task.platform, - self.task.tags, - ) - time.sleep(1) - orphan = True - else: - log.info( - "Task #%s: acquired machine %s (label=%s, arch=%s, platform=%s)", - self.task.id, - machine.name, - machine.label, - machine.arch, - machine.platform, - ) - break - - self.machine = machine + try: + self.machinery_manager.machinery.screenshot(self.machine.label, screenshot_path) + except Exception as err: + self.log.warning("Failed to take screenshot of %s: %s", self.machine.label, err) + self.num_screenshots -= 1 def build_options(self): """Generate analysis options. @@ -243,13 +233,13 @@ def build_options(self): "category": self.task.category, "target": self.task.target, "package": self.task.package, - "options": self.task.options, + "options": self.get_machine_specific_options(self.task.options), "enforce_timeout": self.task.enforce_timeout, "clock": self.task.clock, "terminate_processes": self.cfg.cuckoo.terminate_processes, "upload_max_size": self.cfg.resultserver.upload_max_size, "do_upload_max_size": int(self.cfg.resultserver.do_upload_max_size), - "enable_trim": enable_trim, + "enable_trim": int(Config("web").general.enable_trim), "timeout": self.task.timeout or self.cfg.timeouts.default, } @@ -267,18 +257,18 @@ def build_options(self): return options - def category_checks(self): + def category_checks(self) -> Optional[bool]: if self.task.category in ("file", "pcap", "static"): sha256 = File(self.task.target).get_sha256() # Check whether the file has been changed for some unknown reason. # And fail this analysis if it has been modified. if not self.check_file(sha256): - log.debug("check file") + self.log.debug("check file") return False # Store a copy of the original file. if not self.store_file(sha256): - log.debug("store file") + self.log.debug("store file") return False if self.task.category in ("pcap", "static"): @@ -294,241 +284,223 @@ def category_checks(self): try: path_mkdir(os.path.join(self.storage, dirname)) except Exception: - log.debug("Failed to create folder %s", dirname) + self.log.debug("Failed to create folder %s", dirname) return True - def launch_analysis(self): - """Start analysis.""" - global active_analysis_count - succeeded = False - dead_machine = False - self.socks5s = _load_socks5_operational() - aux = False - # Initialize the analysis folders. - if not self.init_storage(): - log.debug("Failed to initialize the analysis folder") - return False - - with self.db.session.begin(): - category_early_escape = self.category_checks() - if isinstance(category_early_escape, bool): - return category_early_escape + return None - log.info( - "Task #%s: Starting analysis of %s '%s'", - self.task.id, - self.task.category.upper(), - convert_to_printable(self.task.target), - ) + @contextlib.contextmanager + def machine_running(self) -> Generator[None, None, None]: + assert self.machinery_manager and self.machine and self.guest - # Acquire analysis machine. try: with self.db.session.begin(): - self.acquire_machine() - self.set_machine_specific_options() - guest_log = self.db.set_task_vm_and_guest_start( - self.task.id, - self.machine.name, - self.machine.label, - self.task.platform, - self.machine.id, - scheduler.machinery.__class__.__name__, - self.task.options, - ) - self.db.session.flush() - self.db.session.expunge(self.machine) - # At this point we can tell the ResultServer about it. - except CuckooOperationalError as e: - scheduler.machine_lock.release() - log.error("Task #%s: Cannot acquire machine: %s", self.task.id, e, exc_info=True) - return False + self.machinery_manager.start_machine(self.machine) - try: - unlocked = False + yield - # Mark the selected analysis machine in the database as started. - # Start the machine. - with self.db.session.begin(): - scheduler.machinery.start(self.machine.label) + # Take a memory dump of the machine before shutting it off. + self.dump_machine_memory() - # By the time start returns it will have fully started the Virtual - # Machine. We can now safely release the machine lock. - scheduler.machine_lock.release() - unlocked = True + except (CuckooMachineError, CuckooGuestCriticalTimeout) as e: + # This machine has turned dead, so we'll throw an exception + # which informs the AnalysisManager that it should analyze + # this task again with another available machine. + self.log.exception(str(e)) - # Generate the analysis configuration file. - options = self.build_options() + # Remove the guest from the database, so that we can assign a + # new guest when the task is being analyzed with another machine. + with self.db.session.begin(): + self.db.guest_remove(self.guest.id) + self.db.assign_machine_to_task(self.task, None) + self.machinery_manager.machinery.delete_machine(self.machine.name) - if expose_vnc_port and hasattr(scheduler.machinery, "store_vnc_port"): - scheduler.machinery.store_vnc_port(self.machine.label, self.task.id) + # Remove the analysis directory that has been created so + # far, as perform_analysis() is going to be doing that again. + shutil.rmtree(self.storage) - try: - ResultServer().add_task(self.task, self.machine) - except Exception as e: - with self.db.session.begin(): - scheduler.machinery.release(self.machine.label) - log.exception(e, exc_info=True) - self.errors.put(e) + raise CuckooDeadMachine(self.machine.name) from e - aux = RunAuxiliary(task=self.task, machine=self.machine) - - # Enable network routing. - self.route_network() + with self.db.session.begin(): + try: + self.machinery_manager.stop_machine(self.machine) + except CuckooMachineError as e: + self.log.warning("Unable to stop machine %s: %s", self.machine.label, e) + # Explicitly rollback since we don't re-raise the exception. + self.db.session.rollback() + try: + # Release the analysis machine, but only if the machine is not dead. with self.db.session.begin(): - aux.start() - - # Initialize the guest manager. - guest = GuestManager(self.machine.name, self.machine.ip, self.machine.platform, self.task.id, self) + self.machinery_manager.machinery.release(self.machine) + except CuckooMachineError as e: + self.log.error( + "Unable to release machine %s, reason %s. You might need to restore it manually", + self.machine.label, + e, + ) - with self.db.session.begin(): - options["clock"] = self.db.update_clock(self.task.id) - self.db.guest_set_status(self.task.id, "starting") - # Start the analysis. - guest.start_analysis(options) - if guest.get_status_from_db() == "starting": - guest.set_status_in_db("running") - guest.wait_for_completion() - - guest.set_status_in_db("stopping") - succeeded = True - except (CuckooMachineError, CuckooGuestCriticalTimeout) as e: - if not unlocked: - scheduler.machine_lock.release() - log.error(str(e), extra={"task_id": self.task.id}, exc_info=True) - dead_machine = True - except CuckooGuestError as e: - if not unlocked: - scheduler.machine_lock.release() - log.error(str(e), extra={"task_id": self.task.id}, exc_info=True) - finally: - # Stop Auxiliary modules. - if aux: - with self.db.session.begin(): - aux.stop() + def dump_machine_memory(self) -> None: + if not self.cfg.cuckoo.memory_dump and not self.task.memory: + return - # Take a memory dump of the machine before shutting it off. - if self.cfg.cuckoo.memory_dump or self.task.memory: - try: - dump_path = get_memdump_path(self.task.id) - need_space, space_available = free_space_monitor(os.path.dirname(dump_path), return_value=True) - if need_space: - log.error("Not enough free disk space! Could not dump ram (Only %d MB!)", space_available) - else: - scheduler.machinery.dump_memory(self.machine.label, dump_path) - except NotImplementedError: - log.error("The memory dump functionality is not available for the current machine manager") - - except CuckooMachineError as e: - log.error(e, exc_info=True) + assert self.machinery_manager and self.machine - try: - # Stop the analysis machine. - with self.db.session.begin(): - scheduler.machinery.stop(self.machine.label) - - except CuckooMachineError as e: - log.warning("Task #%s: Unable to stop machine %s: %s", self.task.id, self.machine.label, e) + try: + dump_path = get_memdump_path(self.task.id) + need_space, space_available = free_space_monitor(os.path.dirname(dump_path), return_value=True) + if need_space: + self.log.error("Not enough free disk space! Could not dump ram (Only %d MB!)", space_available) + else: + self.machinery_manager.machinery.dump_memory(self.machine.label, dump_path) + except NotImplementedError: + self.log.error("The memory dump functionality is not available for the current machine manager") - # Mark the machine in the database as stopped. Unless this machine - # has been marked as dead, we just keep it as "started" in the - # database so it'll not be used later on in this session. - with self.db.session.begin(): - self.db.guest_stop(guest_log) + except CuckooMachineError as e: + self.log.exception(str(e)) + @contextlib.contextmanager + def result_server(self) -> Generator[None, None, None]: + try: + ResultServer().add_task(self.task, self.machine) + except Exception as e: + self.log.exception("Failed to add task to result-server") + self.error_queue.put(e) + raise + try: + yield + finally: # After all this, we can make the ResultServer forget about the # internal state for this analysis task. ResultServer().del_task(self.task, self.machine) + @contextlib.contextmanager + def network_routing(self) -> Generator[None, None, None]: + self.route_network() + try: + yield + finally: # Drop the network routing rules if any. self.unroute_network() - if dead_machine: - # Remove the guest from the database, so that we can assign a - # new guest when the task is being analyzed with another machine. - with self.db.session.begin(): - self.db.guest_remove(guest_log) - scheduler.machinery.delete_machine(self.machine.name) + @contextlib.contextmanager + def run_auxiliary(self) -> Generator[None, None, None]: + aux = RunAuxiliary(task=self.task, machine=self.machine) - # Remove the analysis directory that has been created so - # far, as launch_analysis() is going to be doing that again. - shutil.rmtree(self.storage) + with self.db.session.begin(): + aux.start() - # This machine has turned dead, so we throw an exception here - # which informs the AnalysisManager that it should analyze - # this task again with another available machine. - raise CuckooDeadMachine() + try: + yield + finally: + with self.db.session.begin(): + aux.stop() + + def run_analysis_on_guest(self) -> None: + # Generate the analysis configuration file. + options = self.build_options() + + guest_manager = GuestManager(self.machine.name, self.machine.ip, self.machine.platform, self.task.id, self) + + with self.db.session.begin(): + if Config("web").guacamole.enabled and hasattr(self.machinery, "store_vnc_port"): + self.machinery.store_vnc_port(self.machine.label, self.task.id) + options["clock"] = self.db.update_clock(self.task.id) + self.db.guest_set_status(self.task.id, "starting") + guest_manager.start_analysis(options) + if guest_manager.get_status_from_db() == "starting": + guest_manager.set_status_in_db("running") + guest_manager.wait_for_completion() + + guest_manager.set_status_in_db("stopping") + + return + def perform_analysis(self) -> bool: + """Start analysis.""" + succeeded = False + self.socks5s = _load_socks5_operational() + + # Initialize the analysis folders. + if not self.init_storage(): + self.log.debug("Failed to initialize the analysis folder") + return False + + with self.db.session.begin(): + category_early_escape = self.category_checks() + if isinstance(category_early_escape, bool): + return category_early_escape + + # At this point, we're sure that this analysis requires a machine. + assert self.machinery_manager and self.machine and self.guest + + with self.db.session.begin(): + self.machinery_manager.scale_pool(self.machine) + + self.log.info("Starting analysis of %s '%s'", self.task.category.upper(), convert_to_printable(self.task.target)) + + with self.machine_running(), self.result_server(), self.network_routing(), self.run_auxiliary(): try: - # Release the analysis machine. But only if the machine has not turned dead yet. + self.run_analysis_on_guest() + except CuckooGuestError as e: + self.log.exception(str(e)) + else: + succeeded = True + finally: with self.db.session.begin(): - scheduler.machinery.release(self.machine.label) - - except CuckooMachineError as e: - log.error( - "Task #%s: Unable to release machine %s, reason %s. You might need to restore it manually", - self.task.id, - self.machine.label, - e, - ) + self.db.guest_stop(self.guest.id) return succeeded - def run(self): - """Run manager thread.""" - with scheduler.active_analysis_count_lock: - scheduler.active_analysis_count += 1 + def launch_analysis(self) -> None: + success = False try: - while True: - try: - success = self.launch_analysis() - except CuckooDeadMachine as e: - log.exception(e) - continue - - break - + success = self.perform_analysis() + except CuckooDeadMachine: + with self.db.session.begin(): + # Put the task back in pending so that the schedule can attempt to + # choose a new machine. + self.db.set_status(self.task.id, TASK_PENDING) + raise + else: with self.db.session.begin(): self.db.set_status(self.task.id, TASK_COMPLETED) + self.log.info("Completed analysis %ssuccessfully.", "" if success else "un") - # If the task is still available in the database, update our task - # variable with what's in the database, as otherwise we're missing - # out on the status and completed_on change. This would then in - # turn thrown an exception in the analysisinfo processing module. - self.task = self.db.view_task(self.task.id) or self.task - self.db.session.expunge(self.task) + self.update_latest_symlink() - log.debug("Task #%s: Released database task with status %s", self.task.id, success) + def update_latest_symlink(self): + # We make a symbolic link ("latest") which links to the latest + # analysis - this is useful for debugging purposes. This is only + # supported under systems that support symbolic links. + if not hasattr(os, "symlink"): + return - # We make a symbolic link ("latest") which links to the latest - # analysis - this is useful for debugging purposes. This is only - # supported under systems that support symbolic links. - if hasattr(os, "symlink"): - latest = os.path.join(CUCKOO_ROOT, "storage", "analyses", "latest") + latest = os.path.join(CUCKOO_ROOT, "storage", "analyses", "latest") - # First we have to remove the existing symbolic link, then we have to create the new one. - # Deal with race conditions using a lock. - latest_symlink_lock.acquire() - try: - # As per documentation, lexists() returns True for dead symbolic links. - if os.path.lexists(latest): - path_delete(latest) + # First we have to remove the existing symbolic link, then we have to create the new one. + # Deal with race conditions using a lock. + with latest_symlink_lock: + try: + # As per documentation, lexists() returns True for dead symbolic links. + if os.path.lexists(latest): + path_delete(latest) - os.symlink(self.storage, latest) - except OSError as e: - log.warning("Task #%s: Error pointing latest analysis symlink: %s", self.task.id, e) - finally: - latest_symlink_lock.release() + os.symlink(self.storage, latest) + except OSError as e: + self.log.warning("Error pointing latest analysis symlink: %s", e) - log.info("Task #%s: analysis procedure completed", self.task.id) - except Exception as e: - log.exception("Task #%s: Failure in AnalysisManager.run: %s", self.task.id, e) + def run(self): + """Run manager thread.""" + try: + self.launch_analysis() + except Exception: + self.log.exception("failure in AnalysisManager.run") + else: + self.log.info("analysis procedure completed") finally: - with self.db.session.begin(): - self.db.set_status(self.task.id, TASK_COMPLETED) - task_log_stop(self.task.id) - with scheduler.active_analysis_count_lock: - scheduler.active_analysis_count -= 1 + if self.done_callback: + self.done_callback(self) def _rooter_response_check(self): if self.rooter_response and self.rooter_response["exception"] is not None: @@ -537,6 +509,7 @@ def _rooter_response_check(self): def route_network(self): """Enable network routing if desired.""" # Determine the desired routing strategy (none, internet, VPN). + routing = Config("routing") self.route = routing.routing.route if self.task.route: @@ -562,14 +535,14 @@ def route_network(self): elif self.route in self.socks5s: self.interface = "" else: - log.warning("Unknown network routing destination specified, ignoring routing for this analysis: %s", self.route) + self.log.warning("Unknown network routing destination specified, ignoring routing for this analysis: %s", self.route) self.interface = None self.rt_table = None # Check if the network interface is still available. If a VPN dies for # some reason, its tunX interface will no longer be available. if self.interface and not rooter("nic_available", self.interface): - log.error( + self.log.error( "The network interface '%s' configured for this analysis is " "not available at the moment, switching to route=none mode", self.interface, @@ -613,7 +586,7 @@ def route_network(self): # check if the interface is up if HAVE_NETWORKIFACES and routing.routing.verify_interface and self.interface and self.interface not in network_interfaces: - log.info("Network interface {} not found, falling back to dropping network traffic", self.interface) + self.log.info("Network interface {} not found, falling back to dropping network traffic", self.interface) self.interface = None self.rt_table = None self.route = "drop" @@ -632,13 +605,14 @@ def route_network(self): ) self._rooter_response_check() - log.info("Enabled route '%s'.", self.route) + self.log.info("Enabled route '%s'.", self.route) if self.rt_table: self.rooter_response = rooter("srcroute_enable", self.rt_table, self.machine.ip) self._rooter_response_check() def unroute_network(self): + routing = Config("routing") if self.interface: self.rooter_response = rooter("forward_disable", self.machine.interface, self.interface, self.machine.ip) self._rooter_response_check() @@ -652,7 +626,7 @@ def unroute_network(self): "hostports_reject_disable", self.machine.interface, self.machine.ip, self.reject_hostports ) self._rooter_response_check() - log.info("Disabled route '%s'", self.route) + self.log.info("Disabled route '%s'", self.route) if self.rt_table: self.rooter_response = rooter("srcroute_disable", self.rt_table, self.machine.ip) diff --git a/lib/cuckoo/core/database.py b/lib/cuckoo/core/database.py index b04463db8de..3306f62dc88 100644 --- a/lib/cuckoo/core/database.py +++ b/lib/cuckoo/core/database.py @@ -12,7 +12,7 @@ import sys from contextlib import suppress from datetime import datetime, timedelta -from typing import Any, List, Optional +from typing import Any, List, Optional, Union, cast # Sflock does a good filetype recon from sflock.abstracts import File as SflockFile @@ -28,6 +28,7 @@ CuckooDatabaseInitializationError, CuckooDependencyError, CuckooOperationalError, + CuckooUnserviceableTaskError, ) from lib.cuckoo.common.integrations.parse_pe import PortableExecutable from lib.cuckoo.common.objects import PCAP, URL, File, Static @@ -50,14 +51,13 @@ event, func, not_, - or_, select, ) from sqlalchemy.exc import IntegrityError, SQLAlchemyError from sqlalchemy.orm import backref, declarative_base, joinedload, relationship, scoped_session, sessionmaker Base = declarative_base() -except ImportError: +except ImportError: # pragma: no cover raise CuckooDependencyError("Unable to import sqlalchemy (install with `poetry run pip install sqlalchemy`)") @@ -156,7 +156,6 @@ ) MACHINE_RUNNING = "running" -MACHINE_SCHEDULED = "scheduled" # Secondary table used in association Machine - Tag. machines_tags = Table( @@ -234,7 +233,7 @@ class Machine(Base): reserved = Column(Boolean(), nullable=False, default=False) def __repr__(self): - return f"" + return f"" def to_dict(self): """Converts object to dict. @@ -280,7 +279,7 @@ class Tag(Base): name = Column(String(255), nullable=False, unique=True) def __repr__(self): - return f"" + return f"" def __init__(self, name): self.name = name @@ -302,7 +301,7 @@ class Guest(Base): task_id = Column(Integer, ForeignKey("tasks.id"), nullable=False, unique=True) def __repr__(self): - return f"" + return f"" def to_dict(self): """Converts object to dict. @@ -323,11 +322,12 @@ def to_json(self): """ return json.dumps(self.to_dict()) - def __init__(self, name, label, platform, manager): + def __init__(self, name, label, platform, manager, task_id): self.name = name self.label = label self.platform = platform self.manager = manager + self.task_id = task_id class Sample(Base): @@ -353,7 +353,7 @@ class Sample(Base): ) def __repr__(self): - return f"" + return f"" def to_dict(self): """Converts object to dict. @@ -423,7 +423,7 @@ def __init__(self, message, task_id): self.task_id = task_id def __repr__(self): - return f"" + return f"" class Task(Base): @@ -544,7 +544,7 @@ def __init__(self, target=None): self.target = target def __repr__(self): - return f"" + return f"" class AlembicVersion(Base): @@ -574,7 +574,7 @@ def __init__(self, dsn=None, schema_check=True): self._connect_database(self.cfg.database.connection) else: file_path = os.path.join(CUCKOO_ROOT, "db", "cuckoo.db") - if not path_exists(file_path): + if not path_exists(file_path): # pragma: no cover db_dir = os.path.dirname(file_path) if not path_exists(db_dir): try: @@ -594,12 +594,13 @@ def __init__(self, dsn=None, schema_check=True): # Create schema. try: Base.metadata.create_all(self.engine) - except SQLAlchemyError as e: + except SQLAlchemyError as e: # pragma: no cover raise CuckooDatabaseError(f"Unable to create or connect to database: {e}") # Get db session. - self.session = scoped_session(sessionmaker(bind=self.engine)) + self.session = scoped_session(sessionmaker(bind=self.engine, expire_on_commit=False)) + # There should be a better way to clean up orphans. This runs after every flush, which is crazy. @event.listens_for(self.session, "after_flush") def delete_tag_orphans(session, ctx): session.query(Tag).filter(~Tag.tasks.any()).filter(~Tag.machines.any()).delete(synchronize_session=False) @@ -613,12 +614,12 @@ def delete_tag_orphans(session, ctx): tmp_session.add(AlembicVersion(version_num=SCHEMA_VERSION)) try: tmp_session.commit() - except SQLAlchemyError as e: + except SQLAlchemyError as e: # pragma: no cover tmp_session.rollback() raise CuckooDatabaseError(f"Unable to set schema version: {e}") else: # Check if db version is the expected one. - if last.version_num != SCHEMA_VERSION and schema_check: + if last.version_num != SCHEMA_VERSION and schema_check: # pragma: no cover print( f"DB schema version mismatch: found {last.version_num}, expected {SCHEMA_VERSION}. Try to apply all migrations" ) @@ -647,7 +648,7 @@ def _connect_database(self, connection_string): ) else: self.engine = create_engine(connection_string) - except ImportError as e: + except ImportError as e: # pragma: no cover lib = e.message.rsplit(maxsplit=1)[-1] raise CuckooDependencyError(f"Missing database driver, unable to import {lib} (install with `pip install {lib}`)") @@ -662,6 +663,7 @@ def _get_or_create(self, model, **kwargs): return instance else: instance = model(**kwargs) + self.session.add(instance) return instance def drop(self): @@ -691,7 +693,9 @@ def delete_machine(self, name) -> bool: log.warning(f"{name} does not exist in the database.") return False - def add_machine(self, name, label, arch, ip, platform, tags, interface, snapshot, resultserver_ip, resultserver_port, reserved): + def add_machine( + self, name, label, arch, ip, platform, tags, interface, snapshot, resultserver_ip, resultserver_port, reserved, locked=False + ) -> Machine: """Add a guest machine. @param name: machine id @param label: machine label @@ -721,6 +725,8 @@ def add_machine(self, name, label, arch, ip, platform, tags, interface, snapshot if tags: for tag in tags.replace(" ", "").split(","): machine.tags.append(self._get_or_create(Tag, name=tag)) + if locked: + machine.locked = True self.session.add(machine) return machine @@ -755,50 +761,36 @@ def update_clock(self, task_id): row.clock = datetime.utcnow() return row.clock - def set_status(self, task_id, status): - """Set task status. - @param task_id: task identifier - @param status: status string - @return: operation status - """ - row = self.session.get(Task, task_id) - - if not row: - return - + def set_task_status(self, task: Task, status) -> Task: if status != TASK_DISTRIBUTED_COMPLETED: - row.status = status + task.status = status if status in (TASK_RUNNING, TASK_DISTRIBUTED): - row.started_on = datetime.now() + task.started_on = datetime.now() elif status in (TASK_COMPLETED, TASK_DISTRIBUTED_COMPLETED): - row.completed_on = datetime.now() + task.completed_on = datetime.now() + + self.session.add(task) + return task - def set_task_vm_and_guest_start(self, task_id, vmname, vmlabel, vmplatform, vm_id, manager, options=None): - """Set task status and logs guest start. + def set_status(self, task_id: int, status) -> Optional[Task]: + """Set task status. @param task_id: task identifier - @param vmname: virtual vm name - @param label: vm label - @param manager: vm manager - @param options: optional task options - @return: guest row id + @param status: status string + @return: operation status """ - row = self.session.get(Task, task_id) + task = self.session.get(Task, task_id) - if not row: - return + if not task: + return None - # Use a nested transaction so that the Guest gets an id that can be stored in the Task row. - with self.session.begin_nested(): - guest = Guest(vmname, vmlabel, vmplatform, manager) - guest.status = "init" - self.session.add(guest) - row.guest = guest - row.machine = vmname - row.machine_id = vm_id - if options: - row.options = options - return guest.id + return self.set_task_status(task, status) + + def create_guest(self, machine: Machine, manager: str, task: Task) -> Guest: + guest = Guest(machine.name, machine.label, machine.platform, manager, task.id) + guest.status = "init" + self.session.add(guest) + return guest def _package_vm_requires_check(self, package: str) -> list: """ @@ -829,84 +821,39 @@ def validate_task_parameters(self, label: str, platform: str, tags: list) -> boo return False return True - def is_relevant_machine_available(self, task: Task, set_status: bool = True) -> bool: - """Checks if a machine that is relevant to the given task is available - @param task: task to validate - @param set_status: boolean which indicate if the status of the task should be changed to TASK_RUNNING in the DB. - @return: boolean indicating if a relevant machine is available + def find_machine_to_service_task(self, task: Task) -> Optional[Machine]: + """Find a machine that is able to service the given task. + Returns: The Machine if an available machine was found; None if there is at least 1 machine + that *could* service it, but they are all currently in use. + Raises: CuckooUnserviceableTaskError if there are no machines in the pool that would be able + to service it. """ task_archs, task_tags = self._task_arch_tags_helper(task) os_version = self._package_vm_requires_check(task.package) - vms = self.list_machines( - locked=False, + if not self.validate_task_parameters(label=task.machine, platform=task.platform, tags=task_tags): + raise CuckooUnserviceableTaskError("Invalid task parameters") + machines = self.session.query(Machine).options(joinedload(Machine.tags)) + machines = self.filter_machines_to_task( + machines=machines, label=task.machine, platform=task.platform, tags=task_tags, - arch=task_archs, + archs=task_archs, os_version=os_version, - include_scheduled=False, ) - if len(vms) > 0: - # There are? Awesome! - if set_status: - self.set_status(task_id=task.id, status=TASK_RUNNING) - assigned = vms[0] # Take the first vm which could be assigned - self.set_machine_status(assigned.label, MACHINE_SCHEDULED) - return True - return False - - def map_tasks_to_available_machines(self, tasks: list) -> list: - """Map tasks to available_machines to schedule in batch and prevent double spending of machines - @param tasks: List of tasks to filter - @return: list of tasks that should be started by the scheduler - """ - results = [] - assigned_machines = [] - for task in tasks: - task_archs, task_tags = self._task_arch_tags_helper(task) - os_version = self._package_vm_requires_check(task.package) - machine = None - if not self.validate_task_parameters(label=task.machine, platform=task.platform, tags=task_tags): - continue - machines = self.session.query(Machine).options(joinedload(Machine.tags)).filter_by(locked=False) - machines = self.filter_machines_to_task( - machines=machines, - label=task.machine, - platform=task.platform, - tags=task_tags, - archs=task_archs, - os_version=os_version, - ) - # This loop is there in order to prevent double spending of machines by filtering - # out already mapped machines - for assigned in assigned_machines: - machines = machines.filter(Machine.label.notlike(assigned.label)) - machines = machines.filter(or_(Machine.status.notlike(MACHINE_SCHEDULED), Machine.status == None)) # noqa: E711 - # Get the first free machine. - machine = machines.first() - if machine: - assigned_machines.append(machine) - self.set_status(task_id=task.id, status=TASK_RUNNING) - results.append(task) - for assigned in assigned_machines: - self.set_machine_status(assigned.label, MACHINE_SCHEDULED) - return results - - def is_serviceable(self, task: Task) -> bool: - """Checks if the task is serviceable. - - This method is useful when there are tasks that will never be serviced - by any of the machines available. This allows callers to decide what to - do when tasks like this are created. - - @return: boolean indicating if any machine could service the task in the future - """ - task_archs, task_tags = self._task_arch_tags_helper(task) - os_version = self._package_vm_requires_check(task.package) - vms = self.list_machines(label=task.machine, platform=task.platform, tags=task_tags, arch=task_archs, os_version=os_version) - if len(vms) > 0: - return True - return False + # Select for update a machine, preferring one that is available and was the one that was used the + # longest time ago. This will give us a machine that can get locked or, if there are none that are + # currently available, we'll at least know that the task is serviceable. + machine = cast( + Optional[Machine], machines.order_by(Machine.locked, Machine.locked_changed_on).with_for_update(of=Machine).first() + ) + if machine is None: + raise CuckooUnserviceableTaskError + if machine.locked: + # There aren't any machines that can service the task NOW, but there is at least one in the pool + # that could service it once it's available. + return None + return machine def fetch_task(self, categories: list = None): """Fetches a task waiting to be processed and locks it for running. @@ -951,7 +898,8 @@ def guest_set_status(self, task_id, status): def guest_remove(self, guest_id): """Removes a guest start entry.""" guest = self.session.get(Guest, guest_id) - self.session.delete(guest) + if guest: + self.session.delete(guest) def guest_stop(self, guest_id): """Logs guest stop. @@ -1010,7 +958,6 @@ def list_machines( arch=None, include_reserved=False, os_version=None, - include_scheduled=True, ) -> List[Machine]: """Lists virtual machines. @return: list of virtual machines @@ -1033,49 +980,32 @@ def list_machines( os_version=os_version, include_reserved=include_reserved, ) - if not include_scheduled: - machines = machines.filter(or_(Machine.status.notlike(MACHINE_SCHEDULED), Machine.status == None)) # noqa: E711 return machines.all() - def lock_machine(self, label=None, platform=None, tags=None, arch=None, os_version=None, need_scheduled=False): + def assign_machine_to_task(self, task: Task, machine: Optional[Machine]) -> Task: + if machine: + task.machine = machine.label + task.machine_id = machine.id + else: + task.machine = None + task.machine_id = None + self.session.add(task) + return task + + def lock_machine(self, machine: Machine) -> Machine: """Places a lock on a free virtual machine. - @param label: optional virtual machine label - @param platform: optional virtual machine platform - @param tags: optional tags required (list) - @param arch: optional virtual machine arch - @param os_version: tags to filter per OS version. Ex: winxp, win7, win10, win11 - @param need_scheduled: should the result be filtered on 'scheduled' machine status + @param machine: the Machine to lock @return: locked machine """ - if not self.validate_task_parameters(label=label, platform=platform, tags=tags): - return None - - machines = self.session.query(Machine) - machines = self.filter_machines_to_task( - machines=machines, label=label, platform=platform, tags=tags, archs=arch, os_version=os_version - ) - # Check if there are any machines that satisfy the - # selection requirements. - if not machines.count(): - raise CuckooOperationalError( - "No machines match selection criteria of label: '%s', platform: '%s', arch: '%s', tags: '%s'" - % (label, platform, arch, tags) - ) - if need_scheduled: - machines = machines.filter(Machine.status.like(MACHINE_SCHEDULED)) - # Get the first free machine. - machine = machines.options(joinedload(Machine.tags)).filter_by(locked=False).with_for_update().first() - - if machine: - machine.locked = True - machine.locked_changed_on = datetime.now() - # XXX I'm not sure that this should be set here. - self.set_machine_status(machine.label, MACHINE_RUNNING) + machine.locked = True + machine.locked_changed_on = datetime.now() + self.set_machine_status(machine, MACHINE_RUNNING) + self.session.add(machine) return machine def unlock_machine(self, machine: Machine) -> Machine: - """Remove lock form a virtual machine. + """Remove lock from a virtual machine. @param machine: The Machine to unlock. @return: unlocked machine """ @@ -1113,27 +1043,24 @@ def get_available_machines(self) -> List[Machine]: machines = self.session.query(Machine).options(joinedload(Machine.tags)).filter_by(locked=False).all() return machines - def get_machines_scheduled(self): + def count_machines_running(self) -> int: machines = self.session.query(Machine) - machines = machines.filter(Machine.status.like(MACHINE_SCHEDULED)) + machines = machines.filter_by(locked=True) return machines.count() - def set_machine_status(self, label, status): + def set_machine_status(self, machine_or_label: Union[str, Machine], status): """Set status for a virtual machine. @param label: virtual machine label @param status: new virtual machine status """ - machine = self.session.query(Machine).filter_by(label=label).first() + if isinstance(machine_or_label, str): + machine = self.session.query(Machine).filter_by(label=machine_or_label).first() + else: + machine = machine_or_label if machine: machine.status = status machine.status_changed_on = datetime.now() - - def check_machines_scheduled_timeout(self): - machines = self.session.query(Machine) - machines = machines.filter(Machine.status.like(MACHINE_SCHEDULED)) - for machine in machines: - if machine.status_changed_on + timedelta(seconds=30) < datetime.now(): - self.set_machine_status(machine.label, MACHINE_RUNNING) + self.session.add(machine) def add_error(self, message, task_id): """Add an error related to a task. @@ -1316,11 +1243,12 @@ def add( # Deal with tags format (i.e., foo,bar,baz) if tags: for tag in tags.split(","): - if tag.strip(): + tag_name = tag.strip() + if tag_name and tag_name not in [tag.name for tag in task.tags]: # "Task" object is being merged into a Session along the backref cascade path for relationship "Tag.tasks"; in SQLAlchemy 2.0, this reverse cascade will not take place. # Set cascade_backrefs to False in either the relationship() or backref() function for the 2.0 behavior; or to set globally for the whole Session, set the future=True flag # (Background on this error at: https://sqlalche.me/e/14/s9r1) (Background on SQLAlchemy 2.0 at: https://sqlalche.me/e/b8d9) - task.tags.append(self._get_or_create(Tag, name=tag.strip())) + task.tags.append(self._get_or_create(Tag, name=tag_name)) if clock: if isinstance(clock, str): @@ -1972,6 +1900,7 @@ def list_tasks( task_ids=False, include_hashes=False, user_id=None, + for_update=False, ) -> List[Task]: """Retrieve list of task. @param limit: specify a limit of entries. @@ -2046,7 +1975,10 @@ def list_tasks( else: search = search.order_by(Task.added_on.desc()) - tasks = search.limit(limit).offset(offset).all() + search = search.limit(limit).offset(offset) + if for_update: + search = search.with_for_update(of=Task) + tasks = search.all() return tasks diff --git a/lib/cuckoo/core/guest.py b/lib/cuckoo/core/guest.py index 507d6f5bf76..075b80704d2 100644 --- a/lib/cuckoo/core/guest.py +++ b/lib/cuckoo/core/guest.py @@ -358,17 +358,16 @@ def wait_for_completion(self): start = timeit.default_timer() while self.do_run and self.get_status_from_db() == "running": + time.sleep(1) + if cfg.cuckoo.machinery_screenshots: if count == 0: # indicate screenshot captures have started log.info("Task #%s: Started capturing screenshots for %s", self.task_id, self.vmid) self.analysis_manager.screenshot_machine() - if count >= 5: - log.debug("Task #%s: Analysis is still running (id=%s, ip=%s)", self.task_id, self.vmid, self.ipaddr) - count = 0 - count += 1 - time.sleep(1) + if count % 5 == 0: + log.debug("Task #%s: Analysis is still running (id=%s, ip=%s)", self.task_id, self.vmid, self.ipaddr) # If the analysis hits the critical timeout, just return straight # away and try to recover the analysis results from the guest. diff --git a/lib/cuckoo/core/machinery_manager.py b/lib/cuckoo/core/machinery_manager.py new file mode 100644 index 00000000000..5f36dadf2a3 --- /dev/null +++ b/lib/cuckoo/core/machinery_manager.py @@ -0,0 +1,307 @@ +import logging +import threading +import time +from time import monotonic as _time +from typing import Optional, Union + +from lib.cuckoo.common.abstracts import Machinery +from lib.cuckoo.common.config import Config +from lib.cuckoo.common.exceptions import CuckooCriticalError, CuckooMachineError +from lib.cuckoo.core.database import Database, Machine, Task, _Database +from lib.cuckoo.core.plugins import list_plugins +from lib.cuckoo.core.rooter import rooter, vpns + +log = logging.getLogger(__name__) + +routing = Config("routing") + + +class ScalingBoundedSemaphore(threading.Semaphore): + """Implements a dynamic bounded semaphore. + + A bounded semaphore checks to make sure its current value doesn't exceed its + limit value. If it does, ValueError is raised. In most situations + semaphores are used to guard resources with limited capacity. + + If the semaphore is released too many times it's a sign of a bug. If not + given, value defaults to 1. + + Like regular semaphores, bounded semaphores manage a counter representing + the number of release() calls minus the number of acquire() calls, plus a + limit value. The acquire() method blocks if necessary until it can return + without making the counter negative. If not given, value defaults to 1. + + In this version of semaphore there is an upper limit where its limit value + can never reach when it is changed. The idea behind it is that in machinery + documentation there is a limit of machines that can be available so there is + no point having it higher than that. + """ + + def __init__(self, value=1, upper_limit=1): + threading.Semaphore.__init__(self, value) + self._limit_value = value + self._upper_limit = upper_limit + + def acquire(self, blocking=True, timeout=None): + """Acquire a semaphore, decrementing the internal counter by one. + + When invoked without arguments: if the internal counter is larger than + zero on entry, decrement it by one and return immediately. If it is zero + on entry, block, waiting until some other thread has called release() to + make it larger than zero. This is done with proper interlocking so that + if multiple acquire() calls are blocked, release() will wake exactly one + of them up. The implementation may pick one at random, so the order in + which blocked threads are awakened should not be relied on. There is no + return value in this case. + + When invoked with blocking set to true, do the same thing as when called + without arguments, and return true. + + When invoked with blocking set to false, do not block. If a call without + an argument would block, return false immediately; otherwise, do the + same thing as when called without arguments, and return true. + + When invoked with a timeout other than None, it will block for at + most timeout seconds. If acquire does not complete successfully in + that interval, return false. Return true otherwise. + + """ + if not blocking and timeout is not None: + raise ValueError("Cannot specify timeout for non-blocking acquire()") + rc = False + endtime = None + with self._cond: + while self._value == 0: + if not blocking: + break + if timeout is not None: + if endtime is None: + endtime = _time() + timeout + else: + timeout = endtime - _time() + if timeout <= 0: + break + self._cond.wait(timeout) + else: + self._value -= 1 + rc = True + return rc + + __enter__ = acquire + + def release(self): + """Release a semaphore, incrementing the internal counter by one. + + When the counter is zero on entry and another thread is waiting for it + to become larger than zero again, wake up that thread. + + If the number of releases exceeds the number of acquires, + raise a ValueError. + + """ + with self._cond: + if self._value > self._upper_limit: + raise ValueError("Semaphore released too many times") + if self._value >= self._limit_value: + self._value = self._limit_value + self._cond.notify() + return + self._value += 1 + self._cond.notify() + + def __exit__(self, t, v, tb): + self.release() + + def update_limit(self, value): + """Update the limit value for the semaphore + + This limit value is the bounded limit, and proposed limit values + are validated against the upper limit. + + """ + if 0 < value < self._upper_limit: + self._limit_value = value + if self._value > value: + self._value = value + + def check_for_starvation(self, available_count: int): + """Check for preventing starvation from coming up after updating the limit. + Take no parameter. + Return true on starvation. + """ + if self._value == 0 and available_count == self._limit_value: + self._value = self._limit_value + return True + # Resync of the lock value + if abs(self._value - available_count) > 0: + self._value = available_count + return True + return False + + +MachineryLockType = Union[threading.Lock, threading.BoundedSemaphore, ScalingBoundedSemaphore] + + +class MachineryManager: + def __init__(self): + self.cfg = Config() + self.db: _Database = Database() + self.machinery_name: str = self.cfg.cuckoo.machinery + self.machinery: Machinery = self.create_machinery() + self.pool_scaling_lock = threading.Lock() + if self.machinery.module_name != self.machinery_name: + raise CuckooCriticalError( + f"Incorrect machinery module was imported. " + f"Should've been {self.machinery_name} but was {self.machinery.module_name}" + ) + log.info( + "Using %s with max_machines_count=%d", + self, + self.cfg.cuckoo.max_machines_count, + ) + self.machine_lock: MachineryLockType = self.create_machine_lock() + + def __str__(self): + return f"{self.__class__.__name__}[{self.machinery_name}]" + + def create_machine_lock(self) -> MachineryLockType: + retval: MachineryLockType = threading.Lock() + + # You set this value if you are using a machinery that is NOT auto-scaling + max_vmstartup_count = self.cfg.cuckoo.max_vmstartup_count + if max_vmstartup_count: + # The BoundedSemaphore is used to prevent CPU starvation when starting up multiple VMs + log.info("max_vmstartup_count for BoundedSemaphore = %d", max_vmstartup_count) + retval = threading.BoundedSemaphore(max_vmstartup_count) + + # You set this value if you are using a machinery that IS auto-scaling + elif self.cfg.cuckoo.scaling_semaphore: + # If the user wants to use the scaling bounded semaphore, check what machinery is specified, and then + # grab the required configuration key for setting the upper limit + machinery_opts = self.machinery.options.get(self.machinery_name) + machines_limit: int = 0 + if self.machinery_name == "az": + machines_limit = machinery_opts.get("total_machines_limit") + elif self.machinery_name == "aws": + machines_limit = machinery_opts.get("dynamic_machines_limit") + if machines_limit: + # The ScalingBoundedSemaphore is used to keep feeding available machines from the pending tasks queue + log.info("upper limit for ScalingBoundedSemaphore = %d", machines_limit) + retval = ScalingBoundedSemaphore(value=len(machinery_opts["machines"]), upper_limit=machines_limit) + else: + log.warning( + "scaling_semaphore is set but the %s machinery does not set the machines limit. Ignoring scaling semaphore.", + self.machinery_name, + ) + + return retval + + @staticmethod + def create_machinery() -> Machinery: + # Get registered class name. Only one machine manager is imported, + # therefore there should be only one class in the list. + plugin = list_plugins("machinery")[0] + machinery: Machinery = plugin() + + return machinery + + def find_machine_to_service_task(self, task: Task) -> Optional[Machine]: + machine = self.db.find_machine_to_service_task(task) + if machine: + log.info( + "Task #%s: found useable machine %s (arch=%s, platform=%s)", + task.id, + machine.name, + machine.arch, + machine.platform, + ) + else: + log.debug( + "Task #%s: no machine available yet for task requiring machine '%s', platform '%s' or tags '%s'.", + task.id, + task.machine, + task.platform, + task.tags, + ) + + return machine + + def initialize_machinery(self) -> None: + """Initialize the machines in the database and initialize routing for them.""" + try: + self.machinery.initialize() + except CuckooMachineError as e: + raise CuckooCriticalError("Error initializing machines") from e + + # At this point all the available machines should have been identified + # and added to the list. If none were found, Cuckoo needs to abort the + # execution. + available_machines = list(self.machinery.machines()) + if not len(available_machines): + raise CuckooCriticalError("No machines available") + else: + log.info("Loaded %d machine%s", len(available_machines), "s" if len(available_machines) != 1 else "") + + if len(available_machines) > 1 and self.db.engine.name == "sqlite": + log.warning( + "As you've configured CAPE to execute parallel analyses, we recommend you to switch to a PostgreSQL database as SQLite might cause some issues" + ) + + # Drop all existing packet forwarding rules for each VM. Just in case + # Cuckoo was terminated for some reason and various forwarding rules + # have thus not been dropped yet. + for machine in available_machines: + rooter("inetsim_disable", machine.ip) + if not machine.interface: + log.info( + "Unable to determine the network interface for VM with name %s, Cape will not be able to give it " + "full internet access or route it through a VPN! Please define a default network interface for the " + "machinery or define a network interface for each VM", + machine.name, + ) + continue + + # Drop forwarding rule to each VPN. + for vpn in vpns.values(): + rooter("forward_disable", machine.interface, vpn.interface, machine.ip) + + # Drop forwarding rule to the internet / dirty line. + if routing.routing.internet != "none": + rooter("forward_disable", machine.interface, routing.routing.internet, machine.ip) + + threading.Thread(target=self.thr_maintain_scaling_bounded_semaphore, daemon=True) + + def running_machines_max_reached(self) -> bool: + """Return true if we've reached the maximum number of running machines.""" + return 0 < self.cfg.cuckoo.max_machines_count <= self.machinery.running_count() + + def scale_pool(self, machine: Machine) -> None: + """For machinery backends that support auto-scaling, make sure that enough machines + are spun up. For other types of machinery, this is basically a noop. This is called + from the AnalysisManager (i.e. child) thread, so we use a lock to make sure that + it doesn't get called multiple times simultaneously. We don't want to call it from + the main thread as that would block the scheduler while machines are spun up. + Note that the az machinery maintains its own thread to monitor to size of the pool. + """ + with self.pool_scaling_lock: + self.machinery.scale_pool(machine) + + def start_machine(self, machine: Machine) -> None: + with self.machine_lock: + self.machinery.start(machine.label) + + def stop_machine(self, machine: Machine) -> None: + self.machinery.stop(machine.label) + + def thr_maintain_scaling_bounded_semaphore(self) -> None: + """Maintain the limit of the ScalingBoundedSemaphore if one is being used.""" + if not isinstance(self.machine_lock, ScalingBoundedSemaphore) or not self.cfg.cuckoo.scaling_semaphore_update_timer: + return + + while True: + with self.db.session.begin(): + # Here be dragons! Making these calls on the ScalingBoundedSemaphore is not + # thread safe. + self.machine_lock.update_limit(len(self.machinery.machines())) + self.machine_lock.check_for_starvation(self.machinery.availables(include_reserved=True)) + time.sleep(self.cfg.cuckoo.scaling_semaphore_update_timer) diff --git a/lib/cuckoo/core/scheduler.py b/lib/cuckoo/core/scheduler.py index 2835a247ffb..037e03dbd9f 100644 --- a/lib/cuckoo/core/scheduler.py +++ b/lib/cuckoo/core/scheduler.py @@ -2,6 +2,7 @@ # This file is part of Cuckoo Sandbox - http://www.cuckoosandbox.org # See the file 'docs/LICENSE' for copying permission. +import contextlib import enum import logging import os @@ -10,26 +11,18 @@ import threading import time from collections import defaultdict -from time import monotonic as _time +from typing import DefaultDict, List, Optional, Tuple from lib.cuckoo.common.config import Config from lib.cuckoo.common.constants import CUCKOO_ROOT -from lib.cuckoo.common.exceptions import CuckooCriticalError, CuckooMachineError -from lib.cuckoo.common.utils import free_space_monitor, load_categories +from lib.cuckoo.common.exceptions import CuckooUnserviceableTaskError +from lib.cuckoo.common.utils import CATEGORIES_NEEDING_VM, free_space_monitor, load_categories from lib.cuckoo.core.analysis_manager import AnalysisManager -from lib.cuckoo.core.database import TASK_FAILED_ANALYSIS, TASK_PENDING, Database, Task -from lib.cuckoo.core.plugins import list_plugins -from lib.cuckoo.core.rooter import rooter, vpns +from lib.cuckoo.core.database import TASK_FAILED_ANALYSIS, TASK_PENDING, Database, Machine, Task, _Database +from lib.cuckoo.core.machinery_manager import MachineryManager log = logging.getLogger(__name__) -machinery = None -machine_lock = None -routing = Config("routing") - -active_analysis_count = 0 -active_analysis_count_lock = threading.Lock() - class LoopState(enum.IntEnum): """Enum that represents the state of the main scheduler loop.""" @@ -40,165 +33,241 @@ class LoopState(enum.IntEnum): INACTIVE = 4 -class ScalingBoundedSemaphore(threading.Semaphore): - """Implements a dynamic bounded semaphore. +class SchedulerCycleDelay(enum.IntEnum): + SUCCESS = 0 + NO_PENDING_TASKS = 1 + MAX_MACHINES_RUNNING = 1 + SCHEDULER_PAUSED = 5 + FAILURE = 5 + LOW_DISK_SPACE = 30 - A bounded semaphore checks to make sure its current value doesn't exceed its - limit value. If it does, ValueError is raised. In most situations - semaphores are used to guard resources with limited capacity. - If the semaphore is released too many times it's a sign of a bug. If not - given, value defaults to 1. - - Like regular semaphores, bounded semaphores manage a counter representing - the number of release() calls minus the number of acquire() calls, plus a - limit value. The acquire() method blocks if necessary until it can return - without making the counter negative. If not given, value defaults to 1. +class Scheduler: + """Tasks Scheduler. - In this version of semaphore there is an upper limit where its limit value - can never reach when it is changed. The idea behind it is that in machinery - documentation there is a limit of machines that can be available so there is - no point having it higher than that. + This class is responsible for the main execution loop of the tool. It + prepares the analysis machines and keep waiting and loading for new + analysis tasks. + Whenever a new task is available, it launches AnalysisManager which will + take care of running the full analysis process and operating with the + assigned analysis machine. """ - def __init__(self, value=1, upper_limit=1): - threading.Semaphore.__init__(self, value) - self._limit_value = value - self._upper_limit = upper_limit + def __init__(self, maxcount=0): + self.loop_state = LoopState.INACTIVE + self.cfg = Config() + self.db: _Database = Database() + self.max_analysis_count: int = maxcount or self.cfg.cuckoo.max_analysis_count + self.analysis_threads_lock = threading.Lock() + self.total_analysis_count: int = 0 + self.analysis_threads: List[AnalysisManager] = [] + self.analyzing_categories, categories_need_VM = load_categories() + self.machinery_manager = MachineryManager() if categories_need_VM else None + log.info("Creating scheduler with max_analysis_count=%s", self.max_analysis_count or "unlimited") + + @property + def active_analysis_count(self) -> int: + with self.analysis_threads_lock: + return len(self.analysis_threads) + + def analysis_finished(self, analysis_manager: AnalysisManager): + with self.analysis_threads_lock: + try: + self.analysis_threads.remove(analysis_manager) + except ValueError: + pass - def acquire(self, blocking=True, timeout=None): - """Acquire a semaphore, decrementing the internal counter by one. + def do_main_loop_work(self, error_queue: queue.Queue) -> SchedulerCycleDelay: + """Return the number of seconds to sleep after returning.""" + if self.loop_state == LoopState.STOPPING: + # This blocks the main loop until the analyses are finished. + self.wait_for_running_analyses_to_finish() + self.loop_state = LoopState.INACTIVE + return SchedulerCycleDelay.SUCCESS - When invoked without arguments: if the internal counter is larger than - zero on entry, decrement it by one and return immediately. If it is zero - on entry, block, waiting until some other thread has called release() to - make it larger than zero. This is done with proper interlocking so that - if multiple acquire() calls are blocked, release() will wake exactly one - of them up. The implementation may pick one at random, so the order in - which blocked threads are awakened should not be relied on. There is no - return value in this case. + if self.loop_state == LoopState.PAUSED: + log.debug("scheduler is paused, send '%s' to process %d to resume", signal.SIGUSR2, os.getpid()) + return SchedulerCycleDelay.SCHEDULER_PAUSED - When invoked with blocking set to true, do the same thing as when called - without arguments, and return true. + if 0 < self.max_analysis_count <= self.total_analysis_count: + log.info("Maximum analysis count has been reached, shutting down.") + self.stop() + return SchedulerCycleDelay.SUCCESS - When invoked with blocking set to false, do not block. If a call without - an argument would block, return false immediately; otherwise, do the - same thing as when called without arguments, and return true. + if self.is_short_on_disk_space(): + return SchedulerCycleDelay.LOW_DISK_SPACE - When invoked with a timeout other than None, it will block for at - most timeout seconds. If acquire does not complete successfully in - that interval, return false. Return true otherwise. + analysis_manager: Optional[AnalysisManager] = None + with self.db.session.begin(): + if self.machinery_manager and self.machinery_manager.running_machines_max_reached(): + return SchedulerCycleDelay.MAX_MACHINES_RUNNING - """ - if not blocking and timeout is not None: - raise ValueError("Cannot specify timeout for non-blocking acquire()") - rc = False - endtime = None - with self._cond: - while self._value == 0: - if not blocking: - break - if timeout is not None: - if endtime is None: - endtime = _time() + timeout - else: - timeout = endtime - _time() - if timeout <= 0: - break - self._cond.wait(timeout) - else: - self._value -= 1 - rc = True - return rc - - __enter__ = acquire - - def release(self): - """Release a semaphore, incrementing the internal counter by one. - - When the counter is zero on entry and another thread is waiting for it - to become larger than zero again, wake up that thread. - - If the number of releases exceeds the number of acquires, - raise a ValueError. + try: + task, machine = self.find_next_serviceable_task() + except Exception: + log.exception("Failed to find next serviceable task") + # Explicitly call rollback since we're not re-raising the exception and letting the + # begin() context manager handle rolling back the transaction. + self.db.session.rollback() + return SchedulerCycleDelay.FAILURE + + if task is None: + # There are no pending tasks so try again in 1 second. + return SchedulerCycleDelay.NO_PENDING_TASKS + + log.info("Task #%s: Processing task", task.id) + self.total_analysis_count += 1 + analysis_manager = AnalysisManager( + task, machine, self.machinery_manager, error_queue, done_callback=self.analysis_finished + ) + analysis_manager.prepare_task_and_machine_to_start() + self.db.session.expunge_all() - """ - with self._cond: - if self._value > self._upper_limit: - raise ValueError("Semaphore released too many times") - if self._value >= self._limit_value: - self._value = self._limit_value - self._cond.notify() - return - self._value += 1 - self._cond.notify() + with self.analysis_threads_lock: + self.analysis_threads.append(analysis_manager) + analysis_manager.start() - def __exit__(self, t, v, tb): - self.release() + return SchedulerCycleDelay.SUCCESS - def update_limit(self, value): - """Update the limit value for the semaphore + def find_next_serviceable_task(self) -> Tuple[Optional[Task], Optional[Machine]]: + task: Optional[Task] = None + machine: Optional[Machine] = None - This limit value is the bounded limit, and proposed limit values - are validated against the upper limit. + if self.machinery_manager: + task, machine = self.find_pending_task_to_service() + else: + task = self.find_pending_task_not_requiring_machinery() + + return task, machine + + def find_pending_task_not_requiring_machinery(self) -> Optional[Task]: + # This function must only be called when we're configured to not process any tasks + # that require machinery. + assert not self.machinery_manager + + task: Optional[Task] = None + tasks = self.db.list_tasks( + category=[category for category in self.analyzing_categories if category not in CATEGORIES_NEEDING_VM], + status=TASK_PENDING, + order_by=(Task.priority.desc(), Task.added_on), + options_not_like="node=", + limit=1, + for_update=True, + ) + if tasks: + task = tasks[0] + return task + + def find_pending_task_to_service(self) -> Tuple[Optional[Task], Optional[Machine]]: + # This function must only be called when we have the ability to use machinery. + assert self.machinery_manager + + task: Optional[Task] = None + machine: Optional[Machine] = None + # Get the list of all pending tasks in the order that they should be processed. + for task_candidate in self.db.list_tasks( + status=TASK_PENDING, + order_by=(Task.priority.desc(), Task.added_on), + options_not_like="node=", + for_update=True, + ): + if task_candidate.category not in CATEGORIES_NEEDING_VM: + # This task can definitely be processed because it doesn't need a machine. + task = task_candidate + break - """ - if value < self._upper_limit and value > 0: - self._limit_value = value - if self._value > value: - self._value = value - - def check_for_starvation(self, available_count: int): - """Check for preventing starvation from coming up after updating the limit. - Take no parameter. - Return true on starvation. - """ - if self._value == 0 and available_count == self._limit_value: - self._value = self._limit_value - return True - # Resync of the lock value - if abs(self._value - available_count) > 0: - self._value = available_count - return True - return False + try: + machine = self.machinery_manager.find_machine_to_service_task(task_candidate) + except CuckooUnserviceableTaskError: + if self.cfg.cuckoo.fail_unserviceable: + log.info("Task #%s: Failing unserviceable task", task_candidate.id) + self.db.set_status(task_candidate.id, TASK_FAILED_ANALYSIS) + else: + log.info("Task #%s: Unserviceable task", task_candidate.id) + continue + if machine: + task = task_candidate + break -class CuckooDeadMachine(Exception): - """Exception thrown when a machine turns dead. + return task, machine - When this exception has been thrown, the analysis task will start again, - and will try to use another machine, when available. - """ + def get_available_machine_stats(self) -> DefaultDict[str, int]: + available_machine_stats = defaultdict(int) + for machine in self.db.get_available_machines(): + for tag in machine.tags: + if tag: + available_machine_stats[tag.name] += 1 + if machine.platform: + available_machine_stats[machine.platform] += 1 - pass + return available_machine_stats + def get_locked_machine_stats(self) -> DefaultDict[str, int]: + locked_machine_stats = defaultdict(int) + for machine in self.db.list_machines(locked=True): + for tag in machine.tags: + if tag: + locked_machine_stats[tag.name] += 1 + if machine.platform: + locked_machine_stats[machine.platform] += 1 -class Scheduler: - """Tasks Scheduler. + return locked_machine_stats - This class is responsible for the main execution loop of the tool. It - prepares the analysis machines and keep waiting and loading for new - analysis tasks. - Whenever a new task is available, it launches AnalysisManager which will - take care of running the full analysis process and operating with the - assigned analysis machine. - """ + def get_pending_task_stats(self) -> DefaultDict[str, int]: + pending_task_stats = defaultdict(int) + for task in self.db.list_tasks(status=TASK_PENDING): + for tag in task.tags: + if tag: + pending_task_stats[tag.name] += 1 + if task.platform: + pending_task_stats[task.platform] += 1 + if task.machine: + pending_task_stats[task.machine] += 1 - def __init__(self, maxcount=None): - self.loop_state = LoopState.INACTIVE - self.cfg = Config() - self.db = Database() - self.maxcount = maxcount - self.total_analysis_count = 0 - self.analyzing_categories, self.categories_need_VM = load_categories() - self.analysis_threads = [] + return pending_task_stats + + def is_short_on_disk_space(self): + """If not enough free disk space is available, then we print an + error message and wait another round. This check is ignored + when the freespace configuration variable is set to zero. + """ + if not self.cfg.cuckoo.freespace: + return False + + # Resolve the full base path to the analysis folder, just in + # case somebody decides to make a symbolic link out of it. + dir_path = os.path.join(CUCKOO_ROOT, "storage", "analyses") + need_space, space_available = free_space_monitor(dir_path, return_value=True, analysis=True) + if need_space: + log.error( + "Not enough free disk space! (Only %d MB!). You can change limits it in cuckoo.conf -> freespace", space_available + ) + return need_space + + @contextlib.contextmanager + def loop_signals(self): + signals_to_handle = (signal.SIGHUP, signal.SIGTERM, signal.SIGUSR1, signal.SIGUSR2) + for sig in signals_to_handle: + signal.signal(sig, self.signal_handler) + try: + yield + finally: + for sig in signals_to_handle: + signal.signal(sig, signal.SIG_DFL) + + def shutdown_machinery(self): + """Shutdown machine manager (used to kill machines that still alive).""" + if self.machinery_manager: + with self.db.session.begin(): + self.machinery_manager.machinery.shutdown() def signal_handler(self, signum, frame): """Scheduler signal handler""" sig = signal.Signals(signum) if sig in (signal.SIGHUP, signal.SIGTERM): log.info("received signal '%s', waiting for remaining analysis to finish before stopping", sig.name) - self.loop_state = LoopState.STOPPING + self.stop() elif sig == signal.SIGUSR1: log.info("received signal '%s', pausing new detonations, running detonations will continue until completion", sig.name) self.loop_state = LoopState.PAUSED @@ -208,356 +277,91 @@ def signal_handler(self, signum, frame): else: log.info("received signal '%s', nothing to do", sig.name) - def initialize(self): - """Initialize the machine manager.""" - global machinery, machine_lock - - machinery_name = self.cfg.cuckoo.machinery - if not self.categories_need_VM: - return - - # Get registered class name. Only one machine manager is imported, - # therefore there should be only one class in the list. - plugin = list_plugins("machinery")[0] - # Initialize the machine manager. - machinery = plugin() - - # Provide a dictionary with the configuration options to the - # machine manager instance. - machinery.set_options(Config(machinery_name)) - - # Initialize the machine manager. - try: - machinery.initialize(machinery_name) - except CuckooMachineError as e: - raise CuckooCriticalError(f"Error initializing machines: {e}") - # If the user wants to use the scaling bounded semaphore, check what machinery is specified, and then - # grab the required configuration key for setting the upper limit - if self.cfg.cuckoo.scaling_semaphore: - machinery_opts = machinery.options.get(machinery_name) - if machinery_name == "az": - machines_limit = machinery_opts.get("total_machines_limit") - elif machinery_name == "aws": - machines_limit = machinery_opts.get("dynamic_machines_limit") - # You set this value if you are using a machinery that is NOT auto-scaling - max_vmstartup_count = self.cfg.cuckoo.max_vmstartup_count - if max_vmstartup_count: - # The BoundedSemaphore is used to prevent CPU starvation when starting up multiple VMs - machine_lock = threading.BoundedSemaphore(max_vmstartup_count) - # You set this value if you are using a machinery that IS auto-scaling - elif self.cfg.cuckoo.scaling_semaphore and machines_limit: - # The ScalingBoundedSemaphore is used to keep feeding available machines from the pending tasks queue - machine_lock = ScalingBoundedSemaphore(value=len(machinery.machines()), upper_limit=machines_limit) - else: - machine_lock = threading.Lock() - - log.info( - 'Using "%s" machine manager with max_analysis_count=%d, max_machines_count=%d, and max_vmstartup_count=%d', - machinery_name, - self.cfg.cuckoo.max_analysis_count, - self.cfg.cuckoo.max_machines_count, - self.cfg.cuckoo.max_vmstartup_count, - ) - - # At this point all the available machines should have been identified - # and added to the list. If none were found, Cuckoo needs to abort the - # execution. - - if not len(machinery.machines()): - raise CuckooCriticalError("No machines available") - else: - log.info("Loaded %d machine/s", len(machinery.machines())) - - if len(machinery.machines()) > 1 and self.db.engine.name == "sqlite": - log.warning( - "As you've configured CAPE to execute parallelanalyses, we recommend you to switch to a PostgreSQL database as SQLite might cause some issues" - ) + def start(self): + """Start scheduler.""" + if self.machinery_manager: + with self.db.session.begin(): + self.machinery_manager.initialize_machinery() - # Drop all existing packet forwarding rules for each VM. Just in case - # Cuckoo was terminated for some reason and various forwarding rules - # have thus not been dropped yet. - for machine in machinery.machines(): - if not machine.interface: - log.info( - "Unable to determine the network interface for VM with name %s, Cuckoo will not be able to give it " - "full internet access or route it through a VPN! Please define a default network interface for the " - "machinery or define a network interface for each VM", - machine.name, - ) - continue + # Message queue with threads to transmit exceptions (used as IPC). + error_queue = queue.Queue() - # Drop forwarding rule to each VPN. - for vpn in vpns.values(): - rooter("forward_disable", machine.interface, vpn.interface, machine.ip) + # Start the logger which grabs database information + if self.cfg.cuckoo.periodic_log: + threading.Thread(target=self.thr_periodic_log, name="periodic_log", daemon=True).start() - # Drop forwarding rule to the internet / dirty line. - if routing.routing.internet != "none": - rooter("forward_disable", machine.interface, routing.routing.internet, machine.ip) + with self.loop_signals(): + log.info("Waiting for analysis tasks") + self.loop_state = LoopState.RUNNING + try: + while self.loop_state in (LoopState.RUNNING, LoopState.PAUSED, LoopState.STOPPING): + sleep_time = self.do_main_loop_work(error_queue) + time.sleep(sleep_time) + try: + raise error_queue.get(block=False) + except queue.Empty: + pass + finally: + self.loop_state = LoopState.INACTIVE def stop(self): """Set loop state to stopping.""" self.loop_state = LoopState.STOPPING - def shutdown_machinery(self): - """Shutdown machine manager (used to kill machines that still alive).""" - if self.categories_need_VM: - machinery.shutdown() - - def start(self): - """Start scheduler.""" - with self.db.session.begin(): - self.initialize() - - log.info("Waiting for analysis tasks") - - # Handle interrupts - for _signal in [signal.SIGHUP, signal.SIGTERM, signal.SIGUSR1, signal.SIGUSR2]: - signal.signal(_signal, self.signal_handler) - - # Message queue with threads to transmit exceptions (used as IPC). - errors = queue.Queue() + def thr_periodic_log(self, oneshot=False): + # Ordinarily, this is the entry-point for a child thread. The oneshot parameter makes + # it easier for testing. + if not log.isEnabledFor(logging.DEBUG): + # The only purpose of this function is to log a debug message, so if debug + # logging is disabled, don't bother making all the database queries every 10 + # seconds--just return. + return - # Command-line overrides the configuration file. - if self.maxcount is None: - self.maxcount = self.cfg.cuckoo.max_analysis_count + while True: + # Since we know we'll be logging the resulting message, just use f-strings + # because they're faster and easier to read than using %s/%d and params to + # log.debug(). + msgs = [f"# Active analysis: {self.active_analysis_count}"] - # Start the logger which grabs database information - if self.cfg.cuckoo.periodic_log: - self._thr_periodic_log() - # Update timer for semaphore limit value if enabled - if self.cfg.cuckoo.scaling_semaphore and not self.cfg.cuckoo.max_vmstartup_count: - # Note that this variable only exists under these conditions - scaling_semaphore_timer = time.time() - - if self.cfg.cuckoo.batch_scheduling: - max_batch_scheduling_count = ( - self.cfg.cuckoo.max_batch_count if self.cfg.cuckoo.max_batch_count and self.cfg.cuckoo.max_batch_count > 1 else 5 - ) - # This loop runs forever. - - self.loop_state = LoopState.RUNNING - while self.loop_state in (LoopState.RUNNING, LoopState.PAUSED, LoopState.STOPPING): - # Avoid high CPU utilization due to a tight loop under certain conditions - time.sleep(0.5) - - if self.loop_state == LoopState.STOPPING: - # Wait for analyses to finish before stopping - while self.analysis_threads: - thread = self.analysis_threads.pop() - log.debug("Waiting for analysis PID %d", thread.native_id) - thread.join() - break - if self.loop_state == LoopState.PAUSED: - log.debug("scheduler is paused, send '%s' to process %d to resume", signal.SIGUSR2, os.getpid()) - time.sleep(5) - continue - # Update scaling bounded semaphore limit value, if enabled, based on the number of machines - # Wait until the machine lock is not locked. This is only the case - # when all machines are fully running, rather than "about to start" - # or "still busy starting". This way we won't have race conditions - # with finding out there are no available machines in the analysis - # manager or having two analyses pick the same machine. - - # Update semaphore limit value if enabled based on the number of machines - if self.cfg.cuckoo.scaling_semaphore and not self.cfg.cuckoo.max_vmstartup_count: - # Every x seconds, update the semaphore limit. This requires a database call to machinery.availables(), - # hence waiting a bit between calls - if scaling_semaphore_timer + int(self.cfg.cuckoo.scaling_semaphore_update_timer) < time.time(): - machine_lock.update_limit(len(machinery.machines())) - # Prevent full starvation, very unlikely to ever happen. - machine_lock.check_for_starvation(machinery.availables()) - # Note that this variable only exists under these conditions - scaling_semaphore_timer = time.time() - - if self.categories_need_VM: - if not machine_lock.acquire(False): - continue - machine_lock.release() - - # If not enough free disk space is available, then we print an - # error message and wait another round (this check is ignored - # when the freespace configuration variable is set to zero). - if self.cfg.cuckoo.freespace: - # Resolve the full base path to the analysis folder, just in - # case somebody decides to make a symbolic link out of it. - dir_path = os.path.join(CUCKOO_ROOT, "storage", "analyses") - need_space, space_available = free_space_monitor(dir_path, return_value=True, analysis=True) - if need_space: - log.error( - "Not enough free disk space! (Only %d MB!). You can change limits it in cuckoo.conf -> freespace", - space_available, + with self.db.session.begin(): + pending_task_count = self.db.count_tasks(status=TASK_PENDING) + pending_task_stats = self.get_pending_task_stats() + msgs.extend( + ( + f"# Pending Tasks: {pending_task_count}", + f"# Specific Pending Tasks: {dict(pending_task_stats)}", + ) + ) + if self.machinery_manager: + available_machine_count = self.db.count_machines_available() + available_machine_stats = self.get_available_machine_stats() + locked_machine_count = len(self.db.list_machines(locked=True)) + locked_machine_stats = self.get_locked_machine_stats() + total_machine_count = len(self.db.list_machines()) + msgs.extend( + ( + f"# Available Machines: {available_machine_count}", + f"# Available Specific Machines: {dict(available_machine_stats)}", + f"# Locked Machines: {locked_machine_count}", + f"# Specific Locked Machines: {dict(locked_machine_stats)}", + f"# Total Machines: {total_machine_count}", + ) ) - continue + if self.cfg.cuckoo.scaling_semaphore: + lock_value = ( + f"{self.machinery_manager.machine_lock._value}/{self.machinery_manager.machine_lock._limit_value}" + ) + msgs.append(f"# Lock value: {lock_value}") + log.debug("; ".join(msgs)) + + if oneshot: + break - with self.db.session.begin(): - # Have we limited the number of concurrently executing machines? - if self.cfg.cuckoo.max_machines_count > 0 and self.categories_need_VM: - # Are too many running? - if len(machinery.running()) >= self.cfg.cuckoo.max_machines_count: - continue - - # If no machines are available, it's pointless to fetch for pending tasks. Loop over. - # But if we analyze pcaps/static only it's fine - # ToDo verify that it works with static and file/url - if self.categories_need_VM and not machinery.availables(include_reserved=True): - continue - # Exits if max_analysis_count is defined in the configuration - # file and has been reached. - if self.maxcount and self.total_analysis_count >= self.maxcount: - if active_analysis_count <= 0: - log.info("Maximum analysis count has been reached, shutting down.") - self.stop() - else: - if self.cfg.cuckoo.batch_scheduling: - tasks_to_create = [] - if self.categories_need_VM: - # First things first, are there pending tasks? - if not self.db.count_tasks(status=TASK_PENDING): - continue - # There are? Great, let's get them, ordered by priority and then oldest to newest - tasks_with_relevant_machine_available = [] - for task in self.db.list_tasks( - status=TASK_PENDING, order_by=(Task.priority.desc(), Task.added_on), options_not_like="node=" - ): - # Can this task ever be serviced? - if not self.db.is_serviceable(task): - if self.cfg.cuckoo.fail_unserviceable: - log.info("Task #%s: Failing unserviceable task", task.id) - self.db.set_status(task.id, TASK_FAILED_ANALYSIS) - continue - log.info("Task #%s: Unserviceable task", task.id) - if self.db.is_relevant_machine_available(task=task, set_status=False): - tasks_with_relevant_machine_available.append(task) - # The batching number is the number of tasks that will be considered to mapping to machines for starting - # Max_batch_scheduling_count is referring to the batch_scheduling config however this number - # is the maximum and capped for each usage by the number of locks available which refer to - # the number of expected available machines. - batching_number = ( - max_batch_scheduling_count - if machine_lock._value > max_batch_scheduling_count - else machine_lock._value - ) - if len(tasks_with_relevant_machine_available) > batching_number: - tasks_with_relevant_machine_available = tasks_with_relevant_machine_available[:batching_number] - tasks_to_create = self.db.map_tasks_to_available_machines(tasks_with_relevant_machine_available) - else: - tasks_to_create = [] - while True: - task = self.db.fetch_task(self.analyzing_categories) - if not task: - break - else: - tasks_to_create.append(task) - for task in tasks_to_create: - task = self.db.view_task(task.id) - log.debug("Task #%s: Processing task", task.id) - self.total_analysis_count += 1 - # Initialize and start the analysis manager. - analysis = AnalysisManager(task, errors) - analysis.daemon = True - analysis.start() - self.analysis_threads.append(analysis) - # We only want to keep track of active threads - self.analysis_threads = [t for t in self.analysis_threads if t.is_alive()] - else: - if self.categories_need_VM: - # First things first, are there pending tasks? - if not self.db.count_tasks(status=TASK_PENDING): - continue - relevant_machine_is_available = False - # There are? Great, let's get them, ordered by priority and then oldest to newest - for task in self.db.list_tasks( - status=TASK_PENDING, order_by=(Task.priority.desc(), Task.added_on), options_not_like="node=" - ): - # Can this task ever be serviced? - if not self.db.is_serviceable(task): - if self.cfg.cuckoo.fail_unserviceable: - log.debug("Task #%s: Failing unserviceable task", task.id) - self.db.set_status(task.id, TASK_FAILED_ANALYSIS) - continue - log.debug("Task #%s: Unserviceable task", task.id) - relevant_machine_is_available = self.db.is_relevant_machine_available(task) - if relevant_machine_is_available: - break - if not relevant_machine_is_available: - task = None - else: - task = self.db.view_task(task.id) - else: - task = self.db.fetch_task(self.analyzing_categories) - if task: - # Make sure that changes to the status of the task is flushed to the - # database before passing the object off to the child thread. - self.db.session.flush() - self.db.session.expunge_all() - log.debug("Task #%s: Processing task", task.id) - self.total_analysis_count += 1 - # Initialize and start the analysis manager. - analysis = AnalysisManager(task, errors) - analysis.daemon = True - analysis.start() - self.analysis_threads.append(analysis) - # We only want to keep track of active threads - self.analysis_threads = [t for t in self.analysis_threads if t.is_alive()] - - # Deal with errors. - try: - raise errors.get(block=False) - except queue.Empty: - pass - self.loop_state = LoopState.INACTIVE + time.sleep(10) - def _thr_periodic_log(self): - specific_available_machine_counts = defaultdict(int) - for machine in self.db.get_available_machines(): - for tag in machine.tags: - if tag: - specific_available_machine_counts[tag.name] += 1 - if machine.platform: - specific_available_machine_counts[machine.platform] += 1 - specific_pending_task_counts = defaultdict(int) - for task in self.db.list_tasks(status=TASK_PENDING): - for tag in task.tags: - if tag: - specific_pending_task_counts[tag.name] += 1 - if task.platform: - specific_pending_task_counts[task.platform] += 1 - if task.machine: - specific_pending_task_counts[task.machine] += 1 - specific_locked_machine_counts = defaultdict(int) - for machine in self.db.list_machines(locked=True): - for tag in machine.tags: - if tag: - specific_locked_machine_counts[tag.name] += 1 - if machine.platform: - specific_locked_machine_counts[machine.platform] += 1 - if self.cfg.cuckoo.scaling_semaphore: - number_of_machine_scheduled = machinery.get_machines_scheduled() - log.debug( - "# Pending Tasks: %d; # Specific Pending Tasks: %s; # Available Machines: %d; # Available Specific Machines: %s; # Locked Machines: %d; # Specific Locked Machines: %s; # Total Machines: %d; Lock value: %d/%d; # Active analysis: %d; # Machines scheduled: %d", - self.db.count_tasks(status=TASK_PENDING), - dict(specific_pending_task_counts), - self.db.count_machines_available(), - dict(specific_available_machine_counts), - len(self.db.list_machines(locked=True)), - dict(specific_locked_machine_counts), - len(self.db.list_machines()), - machine_lock._value, - machine_lock._limit_value, - active_analysis_count, - number_of_machine_scheduled, - ) - else: - log.debug( - "# Pending Tasks: %d; # Specific Pending Tasks: %s; # Available Machines: %d; # Available Specific Machines: %s; # Locked Machines: %d; # Specific Locked Machines: %s; # Total Machines: %d", - self.db.count_tasks(status=TASK_PENDING), - dict(specific_pending_task_counts), - self.db.count_machines_available(), - dict(specific_available_machine_counts), - len(self.db.list_machines(locked=True)), - dict(specific_locked_machine_counts), - len(self.db.list_machines()), - ) - thr = threading.Timer(10, self._thr_periodic_log) - thr.daemon = True - thr.start() + def wait_for_running_analyses_to_finish(self) -> None: + log.info("Waiting for running analyses to finish.") + while self.analysis_threads: + thread = self.analysis_threads.pop() + log.debug("Waiting for analysis thread (%r)", thread) + thread.join() diff --git a/modules/machinery/aws.py b/modules/machinery/aws.py index a8709eeac2b..a9471d43641 100644 --- a/modules/machinery/aws.py +++ b/modules/machinery/aws.py @@ -2,6 +2,8 @@ import sys import time +from lib.cuckoo.core.database import Machine + try: import boto3 except ImportError: @@ -30,8 +32,7 @@ class AWS(Machinery): AUTOSCALE_CUCKOO = "AUTOSCALE_CUCKOO" - def __init__(self): - super(AWS, self).__init__() + module_name = "aws" """override Machinery method""" @@ -176,13 +177,11 @@ def _allocate_new_machine(self): """override Machinery method""" - def acquire(self, machine_id=None, platform=None, tags=None, arch=None, os_version=None, need_scheduled=False): + def scale_pool(self, machine: Machine) -> None: """ override Machinery method to utilize the auto scale option """ - base_class_return_value = super(AWS, self).acquire(machine_id, platform, tags, need_scheduled=need_scheduled) self._start_or_create_machines() # prepare another machine - return base_class_return_value def _start_or_create_machines(self): """ @@ -295,14 +294,15 @@ def stop(self, label): """override Machinery method""" - def release(self, label=None): + def release(self, machine: Machine) -> Machine: """ we override it to have the ability to run start_or_create_machines() after unlocking the last machine Release a machine. @param label: machine label. """ - super(AWS, self).release(label) + retval = super(AWS, self).release(machine) self._start_or_create_machines() + return retval def _create_instance(self, tags): """ diff --git a/modules/machinery/az.py b/modules/machinery/az.py index a2288322f98..2074e242c3e 100644 --- a/modules/machinery/az.py +++ b/modules/machinery/az.py @@ -88,6 +88,7 @@ class Azure(Machinery): + module_name = "az" # Resource tag that indicates auto-scaling. AUTO_SCALE_CAPE_KEY = "AUTO_SCALE_CAPE" @@ -103,15 +104,14 @@ class Azure(Machinery): WINDOWS_PLATFORM = "windows" LINUX_PLATFORM = "linux" - def _initialize(self, module_name): + def _initialize(self): """ Overloading abstracts.py:_initialize() Read configuration. @param module_name: module name @raise CuckooDependencyError: if there is a problem with the dependencies call """ - self.module_name = module_name - mmanager_opts = self.options.get(module_name) + mmanager_opts = self.options.get(self.module_name) if not isinstance(mmanager_opts["scale_sets"], list): mmanager_opts["scale_sets"] = mmanager_opts["scale_sets"].strip().split(",") @@ -150,7 +150,6 @@ def _initialize_check(self): """ Overloading abstracts.py:_initialize_check() Running checks against Azure that the configuration is correct. - @param module_name: module name, currently not used be required @raise CuckooDependencyError: if there is a problem with the dependencies call """ if not HAVE_AZURE: @@ -483,31 +482,6 @@ def availables(self, label=None, platform=None, tags=None, arch=None, include_re label=label, platform=platform, tags=tags, arch=arch, include_reserved=include_reserved, os_version=os_version ) - def acquire(self, machine_id=None, platform=None, tags=None, arch=None, os_version=[], need_scheduled=False): - """ - Overloading abstracts.py:acquire() to utilize the auto-scale option. - @param machine_id: the name of the machine to be acquired - @param platform: the platform of the machine's operating system to be acquired - @param tags: any tags that are associated with the machine to be acquired - @param arch: the architecture of the operating system - @return: dict representing machine object from DB - """ - base_class_return_value = super(Azure, self).acquire( - machine_id=machine_id, platform=platform, tags=tags, arch=arch, os_version=os_version, need_scheduled=need_scheduled - ) - if base_class_return_value and base_class_return_value.name: - vmss_name, _ = base_class_return_value.name.split("_") - - # Get the VMSS name by the tag - if not machine_pools[vmss_name]["is_scaling"]: - # Start it and forget about it - threading.Thread( - target=self._thr_scale_machine_pool, - args=(self.options.az.scale_sets[vmss_name].pool_tag, True if platform else False), - ).start() - - return base_class_return_value - def _add_machines_to_db(self, vmss_name): """ Adding machines to database that did not exist there before. diff --git a/modules/machinery/esx.py b/modules/machinery/esx.py index 23289cd2f92..4e348b5c5fb 100644 --- a/modules/machinery/esx.py +++ b/modules/machinery/esx.py @@ -12,6 +12,8 @@ class ESX(LibVirtMachinery): """Virtualization layer for ESXi/ESX based on python-libvirt.""" + module_name = "esx" + def _initialize_check(self): """Runs all checks when a machine manager is initialized. @raise CuckooMachineError: if configuration is invalid diff --git a/modules/machinery/kvm.py b/modules/machinery/kvm.py index f167279a34f..d3b3637c807 100644 --- a/modules/machinery/kvm.py +++ b/modules/machinery/kvm.py @@ -11,6 +11,8 @@ class KVM(LibVirtMachinery): """Virtualization layer for KVM based on python-libvirt.""" + module_name = "kvm" + def _initialize_check(self): """Runs all checks when a machine manager is initialized. @raise CuckooMachineError: if configuration is invalid diff --git a/modules/machinery/multi.py b/modules/machinery/multi.py index 6c8a7f34163..333a412105a 100644 --- a/modules/machinery/multi.py +++ b/modules/machinery/multi.py @@ -28,6 +28,8 @@ def import_plugin(name): class MultiMachinery(Machinery): + module_name = "multi" + LABEL = "mm_label" _machineries = {} diff --git a/modules/machinery/physical.py b/modules/machinery/physical.py index 267862b5017..495fe906b8e 100644 --- a/modules/machinery/physical.py +++ b/modules/machinery/physical.py @@ -27,6 +27,8 @@ class Physical(Machinery): """Manage physical sandboxes.""" + module_name = "physical" + # Physical machine states. RUNNING = "running" STOPPED = "stopped" diff --git a/modules/machinery/proxmox.py b/modules/machinery/proxmox.py index 6f981c6f366..b4387d2a86a 100644 --- a/modules/machinery/proxmox.py +++ b/modules/machinery/proxmox.py @@ -25,8 +25,10 @@ class Proxmox(Machinery): """Manage Proxmox sandboxes.""" + module_name = "proxmox" + def __init__(self): - super(Proxmox, self).__init__() + super().__init__() self.timeout = int(cfg.timeouts.vm_state) def _initialize_check(self): diff --git a/modules/machinery/qemu.py b/modules/machinery/qemu.py index 55e72b032ea..6061c1a7e48 100644 --- a/modules/machinery/qemu.py +++ b/modules/machinery/qemu.py @@ -323,13 +323,15 @@ class QEMU(Machinery): """Virtualization layer for QEMU (non-KVM).""" + module_name = "qemu" + # VM states. RUNNING = "running" STOPPED = "stopped" ERROR = "machete" def __init__(self): - super(QEMU, self).__init__() + super().__init__() self.state = {} def _initialize_check(self): diff --git a/modules/machinery/virtualbox.py b/modules/machinery/virtualbox.py index fe7bb016562..71c343ef3e5 100644 --- a/modules/machinery/virtualbox.py +++ b/modules/machinery/virtualbox.py @@ -23,6 +23,8 @@ class VirtualBox(Machinery): """Virtualization layer for VirtualBox.""" + module_name = "virtualbox" + # VM states. SAVED = "saved" RUNNING = "running" diff --git a/modules/machinery/vmware.py b/modules/machinery/vmware.py index ba3c7555a64..222f1ed1a12 100644 --- a/modules/machinery/vmware.py +++ b/modules/machinery/vmware.py @@ -20,6 +20,8 @@ class VMware(Machinery): """Virtualization layer for VMware Workstation using vmrun utility.""" + module_name = "vmware" + LABEL = "vmx_path" def _initialize_check(self): diff --git a/modules/machinery/vmwarerest.py b/modules/machinery/vmwarerest.py index 37c180ffcca..9131b4f413b 100644 --- a/modules/machinery/vmwarerest.py +++ b/modules/machinery/vmwarerest.py @@ -18,6 +18,8 @@ class VMwareREST(Machinery): """Virtualization layer for remote VMware REST Server.""" + module_name = "vmwarerest" + LABEL = "id" def _initialize_check(self): diff --git a/modules/machinery/vmwareserver.py b/modules/machinery/vmwareserver.py index 1b6aeee180d..7473b2be241 100644 --- a/modules/machinery/vmwareserver.py +++ b/modules/machinery/vmwareserver.py @@ -16,6 +16,8 @@ class VMwareServer(Machinery): """Virtualization layer for remote VMware Workstation Server using vmrun utility.""" + module_name = "vmwareserver" + LABEL = "vmx_path" def _initialize_check(self): diff --git a/modules/machinery/vsphere.py b/modules/machinery/vsphere.py index 44d1484eb04..600d19499f4 100644 --- a/modules/machinery/vsphere.py +++ b/modules/machinery/vsphere.py @@ -31,6 +31,8 @@ class vSphere(Machinery): """vSphere/ESXi machinery class based on pyVmomi Python SDK.""" + module_name = "vsphere" + # VM states RUNNING = "poweredOn" POWEROFF = "poweredOff" @@ -41,13 +43,13 @@ def __init__(self): if not HAVE_PYVMOMI: raise CuckooDependencyError("Couldn't import pyVmomi. Please install using 'pip3 install --upgrade pyvmomi'") - super(vSphere, self).__init__() + super().__init__() - def _initialize(self, module_name): + def _initialize(self): """Read configuration. @param module_name: module name. """ - super(vSphere, self)._initialize(module_name) + super(vSphere, self)._initialize() # Initialize random number generator random.seed() diff --git a/utils/process.py b/utils/process.py index 9add1f7e63e..4f118b858db 100644 --- a/utils/process.py +++ b/utils/process.py @@ -303,6 +303,8 @@ def autoprocess( tasks = db.list_tasks(status=TASK_FAILED_PROCESSING, limit=parallel, order_by=Task.completed_on.asc()) else: tasks = db.list_tasks(status=TASK_COMPLETED, limit=parallel, order_by=Task.completed_on.asc()) + # Make sure the tasks are available as normal objects after the transaction ends, so that + # sqlalchemy doesn't auto-initiate a new transaction the next time they are accessed. db.session.expunge_all() added = False # For loop to add only one, nice. (reason is that we shouldn't overshoot maxcount) @@ -497,6 +499,8 @@ def main(): if path_exists(sample): task.__setattr__("target", sample) break + # Make sure that SQLAlchemy doesn't auto-begin a new transaction the next time + # these objects are accessed. db.session.expunge_all() if args.signatures: diff --git a/web/apiv2/views.py b/web/apiv2/views.py index 3077d17a209..5ebc3c4abbf 100644 --- a/web/apiv2/views.py +++ b/web/apiv2/views.py @@ -53,7 +53,7 @@ statistics, validate_task, ) -from lib.cuckoo.core.database import TASK_RECOVERED, TASK_RUNNING, Database, Task +from lib.cuckoo.core.database import TASK_RECOVERED, TASK_RUNNING, Database, Task, _Database from lib.cuckoo.core.rooter import _load_socks5_operational, vpns try: @@ -99,7 +99,7 @@ es_as_db = True es = elastic_handler -db = Database() +db: _Database = Database() # Conditional decorator for web authentication