Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bring Azure machinery auto-scaling up to date. #2243

Merged
merged 9 commits into from
Jul 23, 2024
8 changes: 8 additions & 0 deletions conf/default/az.conf.default
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ total_machines_limit = 50
# Specify the machine's instance type(for example, Standard_F2s_v2, Standard_DS3_v2)
instance_type = <instance_type>

# Determines which type of ephemeral os disk to use. Be sure to research which machine supports which type and at what sizes.
# Possible values are CacheDisk, ResourceDisk, and NvmeDisk. Defaults to CacheDisk
# https://learn.microsoft.com/en-us/rest/api/compute/virtual-machines/list?view=rest-compute-2024-03-02&tabs=HTTP#diffdisksettings
ephemeral_os_disk_placement =

# This boolean flag is used to indicate if we want to programmatically determine how many cores are used
# per VM of the instance_type mentioned above.
# NOTE: If enabled, this is a long call that takes ~ 1 minute to complete. It takes place at
Expand Down Expand Up @@ -102,6 +107,9 @@ overprovision = 0
# such that we can perform bulk reimaging.
wait_time_to_reimage = 4

# This time, in seconds, is the wait between checking if VMSSs should be scaled up/down.
monitor_rate = 300

# This boolean value is used to indicate if we want to use Azure Spot instances rather than
# normal instances
spot_instances = false
Expand Down
2 changes: 1 addition & 1 deletion lib/cuckoo/core/guest.py
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ def wait_for_completion(self):

if status["status"] in ("complete", "failed"):
completed_as = "completed successfully" if status["status"] == "complete" else "failed"
log.info("Task #%s: Analysis %s (id=%s, ip=%s)", completed_as, self.task_id, self.vmid, self.ipaddr)
log.info("Task #%s: Analysis %s (id=%s, ip=%s)", self.task_id, completed_as, self.vmid, self.ipaddr)
self.set_status_in_db("complete")
return
elif status["status"] == "exception":
Expand Down
18 changes: 12 additions & 6 deletions lib/cuckoo/core/machinery_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ def __init__(self):
self.machinery_name: str = self.cfg.cuckoo.machinery
self.machinery: Machinery = self.create_machinery()
self.pool_scaling_lock = threading.Lock()
self.machines_limit: Optional[int] = None
if self.machinery.module_name != self.machinery_name:
raise CuckooCriticalError(
f"Incorrect machinery module was imported. "
Expand Down Expand Up @@ -179,15 +180,14 @@ def create_machine_lock(self) -> MachineryLockType:
# If the user wants to use the scaling bounded semaphore, check what machinery is specified, and then
# grab the required configuration key for setting the upper limit
machinery_opts = self.machinery.options.get(self.machinery_name)
machines_limit: int = 0
if self.machinery_name == "az":
machines_limit = machinery_opts.get("total_machines_limit")
self.machines_limit = machinery_opts.get("total_machines_limit")
elif self.machinery_name == "aws":
machines_limit = machinery_opts.get("dynamic_machines_limit")
if machines_limit:
self.machines_limit = machinery_opts.get("dynamic_machines_limit")
if self.machines_limit:
# The ScalingBoundedSemaphore is used to keep feeding available machines from the pending tasks queue
log.info("upper limit for ScalingBoundedSemaphore = %d", machines_limit)
retval = ScalingBoundedSemaphore(value=len(self.machinery.machines()), upper_limit=machines_limit)
log.info("upper limit for ScalingBoundedSemaphore = %d", self.machines_limit)
retval = ScalingBoundedSemaphore(value=len(self.machinery.machines()), upper_limit=self.machines_limit)
else:
log.warning(
"scaling_semaphore is set but the %s machinery does not set the machines limit. Ignoring scaling semaphore.",
Expand Down Expand Up @@ -295,6 +295,12 @@ def scale_pool(self, machine: Machine) -> None:
self.machinery.scale_pool(machine)

def start_machine(self, machine: Machine) -> None:
if (
isinstance(self.machine_lock, ScalingBoundedSemaphore)
and self.db.count_machines_running() <= self.machines_limit
and self.machine_lock._value == 0
):
self.machine_lock.release()
with self.machine_lock:
self.machinery.start(machine.label)

Expand Down
29 changes: 22 additions & 7 deletions modules/machinery/az.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
CuckooMachineError,
CuckooOperationalError,
)
from lib.cuckoo.core.database import TASK_PENDING
from lib.cuckoo.core.database import TASK_PENDING, Machine

# Only log INFO or higher from imported python packages
logging.getLogger("adal-python").setLevel(logging.INFO)
Expand Down Expand Up @@ -104,16 +104,23 @@ class Azure(Machinery):
WINDOWS_PLATFORM = "windows"
LINUX_PLATFORM = "linux"

def set_options(self, options: dict) -> None:
"""Set machine manager options.
@param options: machine manager options dict.
"""
self.options = options

def _initialize(self):
"""
Overloading abstracts.py:_initialize()
Read configuration.
@param module_name: module name
@raise CuckooDependencyError: if there is a problem with the dependencies call
"""
# Using "scale_sets" here instead of "machines" to avoid KeyError
mmanager_opts = self.options.get(self.module_name)
if not isinstance(mmanager_opts["scale_sets"], list):
mmanager_opts["scale_sets"] = mmanager_opts["scale_sets"].strip().split(",")
mmanager_opts["scale_sets"] = str(mmanager_opts["scale_sets"]).strip().split(",")

# Replace a list of IDs with dictionary representations
scale_sets = mmanager_opts.pop("scale_sets")
Expand Down Expand Up @@ -238,7 +245,7 @@ def _thr_machine_pool_monitor(self):
threading.Thread(target=self._thr_scale_machine_pool, args=(vals["tag"],)).start()

# Check the machine pools every 5 minutes
threading.Timer(300, self._thr_machine_pool_monitor).start()
threading.Timer(self.options.az.monitor_rate, self._thr_machine_pool_monitor).start()

def _set_vmss_stage(self):
"""
Expand Down Expand Up @@ -461,8 +468,13 @@ def stop(self, label):
time.sleep(5)
with reimage_lock:
label_in_reimage_vm_list = label in [f"{vm['vmss']}_{vm['id']}" for vm in reimage_vm_list]

def release(self, machine: Machine):
vmss_name = machine.label.split("_")[0]
if machine_pools[vmss_name]["is_scaling_down"]:
self.delete_machine(machine.label)
else:
self.delete_machine(label)
_ = super(Azure, self).release(machine)

def availables(self, label=None, platform=None, tags=None, arch=None, include_reserved=False, os_version=[]):
"""
Expand Down Expand Up @@ -619,7 +631,7 @@ def delete_machine(self, label, delete_from_vmss=True):
"""
global vms_currently_being_deleted

_ = super(Azure, self).delete_machine(label)
super(Azure, self).delete_machine(label)

if delete_from_vmss:
vmss_name, instance_id = label.split("_")
Expand Down Expand Up @@ -716,7 +728,10 @@ def _thr_create_vmss(self, vmss_name, vmss_image_ref, vmss_image_os):
managed_disk=vmss_managed_disk,
# Ephemeral disk time
caching="ReadOnly",
diff_disk_settings=models.DiffDiskSettings(option="Local"),
diff_disk_settings=models.DiffDiskSettings(
option="Local",
placement=self.options.az.ephemeral_os_disk_placement
),
)
vmss_storage_profile = models.VirtualMachineScaleSetStorageProfile(
image_reference=vmss_image_ref,
Expand Down Expand Up @@ -1110,7 +1125,7 @@ def _get_relevant_machines(self, tag):
"""
# The number of relevant machines are those from the list of locked and unlocked machines
# that have the correct tag in their name
return [machine for machine in self.db.list_machines() if tag in machine.label]
return [machine for machine in self.db.list_machines([tag])]

@staticmethod
def _wait_for_concurrent_operations_to_complete():
Expand Down
5 changes: 3 additions & 2 deletions web/submission/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -537,15 +537,16 @@ def index(request, task_id=None, resubmit_hash=None):
# load multi machinery tags:
# Get enabled machinery
machinery = cfg.cuckoo.get("machinery")
machinery_tags = "scale_sets" if machinery == "az" else "machines"
if machinery == "multi":
for mmachinery in Config(machinery).multi.get("machinery").split(","):
vms = [x.strip() for x in getattr(Config(mmachinery), mmachinery).get("machines").split(",") if x.strip()]
vms = [x.strip() for x in getattr(Config(mmachinery), mmachinery).get(machinery_tags).split(",") if x.strip()]
if any(["tags" in list(getattr(Config(mmachinery), vmtag).keys()) for vmtag in vms]):
enabledconf["tags"] = True
break
else:
# Get VM names for machinery config elements
vms = [x.strip() for x in str(getattr(Config(machinery), machinery).get("machines")).split(",") if x.strip()]
vms = [x.strip() for x in str(getattr(Config(machinery), machinery).get(machinery_tags)).split(",") if x.strip()]
# Check each VM config element for tags
if any(["tags" in list(getattr(Config(machinery), vmtag).keys()) for vmtag in vms]):
enabledconf["tags"] = True
Expand Down