diff --git a/cuckoo/core/scheduler.py b/cuckoo/core/scheduler.py index edd6214e41..3d14bb0adb 100644 --- a/cuckoo/core/scheduler.py +++ b/cuckoo/core/scheduler.py @@ -20,7 +20,7 @@ ) from cuckoo.common.objects import File from cuckoo.common.files import Folders -from cuckoo.core.database import Database, TASK_COMPLETED, TASK_REPORTED +from cuckoo.core.database import Database, TASK_COMPLETED, TASK_REPORTED, TASK_RUNNING, TASK_PENDING from cuckoo.core.guest import GuestManager from cuckoo.core.plugins import RunAuxiliary, RunProcessing from cuckoo.core.plugins import RunSignatures, RunReporting @@ -162,7 +162,7 @@ def acquire_machine(self): # In some cases it's possible that we enter this loop without # having any available machines. We should make sure this is not # such case, or the analysis task will fail completely. - if not machinery.availables(): + if not machinery.availables(label=self.task.machine, platform=self.task.platform, tags=self.task.tags): machine_lock.release() time.sleep(1) continue @@ -965,11 +965,18 @@ def _cleanup_managers(self): cleaned.add(am) return cleaned + def _thr_periodic_log(self): + log.debug("# Tasks: %d; # Available Machines: %d; # Locked Machines: %d; # Total Machines: %d;", + self.db.count_tasks(status=TASK_PENDING), self.db.count_machines_available(), + len(self.db.list_machines(locked=True)), len(self.db.list_machines())) + threading.Timer(10, self._thr_periodic_log).start() + def start(self): """Start scheduler.""" self.initialize() log.info("Waiting for analysis tasks.") + self._thr_periodic_log() # Message queue with threads to transmit exceptions (used as IPC). errors = Queue.Queue() @@ -978,27 +985,12 @@ def start(self): if self.maxcount is None: self.maxcount = self.cfg.cuckoo.max_analysis_count + launched_analysis = True # This loop runs forever. while self.running: - time.sleep(1) - - # Run cleanup on finished analysis managers and untrack them - for am in self._cleanup_managers(): - self.analysis_managers.discard(am) - - # Wait until the machine lock is not locked. This is only the case - # when all machines are fully running, rather that about to start - # or still busy starting. This way we won't have race conditions - # with finding out there are no available machines in the analysis - # manager or having two analyses pick the same machine. - if not machine_lock.acquire(False): - logger( - "Could not acquire machine lock", - action="scheduler.machine_lock", status="busy" - ) - continue - - machine_lock.release() + if not launched_analysis: + time.sleep(1) + launched_analysis = False # If not enough free disk space is available, then we print an # error message and wait another round (this check is ignored @@ -1064,28 +1056,50 @@ def start(self): ) continue - # Fetch a pending analysis task. - # TODO This fixes only submissions by --machine, need to add - # other attributes (tags etc). - # TODO We should probably move the entire "acquire machine" logic - # from the Analysis Manager to the Scheduler and then pass the - # selected machine onto the Analysis Manager instance. - task, available = None, False - for machine in self.db.get_available_machines(): - task = self.db.fetch(machine=machine.name) - if task: - break - - if machine.is_analysis(): + # Get all tasks in the queue + tasks = self.db.list_tasks(status=TASK_PENDING, details=True) + if not tasks: + continue + + for task in tasks: + # Run cleanup on finished analysis managers and untrack them + for am in self._cleanup_managers(): + self.analysis_managers.discard(am) + + # Wait until the machine lock is not locked. This is only the case + # when all machines are fully running, rather that about to start + # or still busy starting. This way we won't have race conditions + # with finding out there are no available machines in the analysis + # manager or having two analyses pick the same machine. + if not machine_lock.acquire(False): + logger( + "Could not acquire machine lock", + action="scheduler.machine_lock", status="busy" + ) + continue + + machine_lock.release() + + available = False + # Note that label > platform > tags + if task.machine: + if machinery.availables(label=task.machine): + available = True + elif task.platform: + if machinery.availables(platform=task.platform): + available = True + elif task.tags: + tag_names = [tag.name for tag in task.tags] + if machinery.availables(tags=tag_names): + available = True + else: available = True - # We only fetch a new task if at least one of the available - # machines is not a "service" machine (again, please refer to the - # services auxiliary module for more information on service VMs). - if not task and available: - task = self.db.fetch(service=False) + if not available: + continue + + self.db.set_status(task.id, TASK_RUNNING) - if task: log.debug("Processing task #%s", task.id) self.total_analysis_count += 1 @@ -1094,7 +1108,7 @@ def start(self): analysis.daemon = True analysis.start() self.analysis_managers.add(analysis) - + launched_analysis = True # Deal with errors. try: raise errors.get(block=False) diff --git a/cuckoo/machinery/kvm.py b/cuckoo/machinery/kvm.py index 9dd84bd0b8..5e9517f3e1 100644 --- a/cuckoo/machinery/kvm.py +++ b/cuckoo/machinery/kvm.py @@ -2,9 +2,13 @@ # Copyright (C) 2014-2016 Cuckoo Foundation. # This file is part of Cuckoo Sandbox - http://www.cuckoosandbox.org # See the file 'docs/LICENSE' for copying permission. +import logging from cuckoo.common.abstracts import LibVirtMachinery from cuckoo.common.exceptions import CuckooCriticalError from cuckoo.common.exceptions import CuckooMachineError +from cuckoo.core.database import Machine +from sqlalchemy.exc import SQLAlchemyError +log = logging.getLogger(__name__) try: import libvirt @@ -33,4 +37,29 @@ def _connect(self): def _disconnect(self, conn): """Disconnect, ignore request to disconnect.""" - pass \ No newline at end of file + pass + + def availables(self, label=None, platform=None, tags=None): + if all(param is None for param in [label, platform, tags]): + return super(KVM, self).availables() + else: + return self._get_specific_availables(label=label, platform=platform, tags=tags) + + def _get_specific_availables(self, label=None, platform=None, tags=None): + session = self.db.Session() + try: + machines = session.query(Machine) + # Note that label > platform > tags + if label: + machines = machines.filter_by(locked=False).filter_by(label=label) + elif platform: + machines = machines.filter_by(locked=False).filter_by(platform=platform) + elif tags: + for tag in tags: + machines = machines.filter_by(locked=False).filter(Machine.tags.any(name=tag)) + return machines.count() + except SQLAlchemyError as e: + log.exception("Database error getting specific available machines: {0}".format(e)) + return 0 + finally: + session.close()