Skip to content

Commit 3b4dfd7

Browse files
Bring Azure machinery auto-scaling up to date. (#2243)
1 parent 36b6242 commit 3b4dfd7

File tree

5 files changed

+46
-16
lines changed

5 files changed

+46
-16
lines changed

conf/default/az.conf.default

+8
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,11 @@ total_machines_limit = 50
5959
# Specify the machine's instance type(for example, Standard_F2s_v2, Standard_DS3_v2)
6060
instance_type = <instance_type>
6161

62+
# Determines which type of ephemeral os disk to use. Be sure to research which machine supports which type and at what sizes.
63+
# Possible values are CacheDisk, ResourceDisk, and NvmeDisk. Defaults to CacheDisk
64+
# https://learn.microsoft.com/en-us/rest/api/compute/virtual-machines/list?view=rest-compute-2024-03-02&tabs=HTTP#diffdisksettings
65+
ephemeral_os_disk_placement =
66+
6267
# This boolean flag is used to indicate if we want to programmatically determine how many cores are used
6368
# per VM of the instance_type mentioned above.
6469
# NOTE: If enabled, this is a long call that takes ~ 1 minute to complete. It takes place at
@@ -102,6 +107,9 @@ overprovision = 0
102107
# such that we can perform bulk reimaging.
103108
wait_time_to_reimage = 4
104109

110+
# This time, in seconds, is the wait between checking if VMSSs should be scaled up/down.
111+
monitor_rate = 300
112+
105113
# This boolean value is used to indicate if we want to use Azure Spot instances rather than
106114
# normal instances
107115
spot_instances = false

lib/cuckoo/core/guest.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -393,7 +393,7 @@ def wait_for_completion(self):
393393

394394
if status["status"] in ("complete", "failed"):
395395
completed_as = "completed successfully" if status["status"] == "complete" else "failed"
396-
log.info("Task #%s: Analysis %s (id=%s, ip=%s)", completed_as, self.task_id, self.vmid, self.ipaddr)
396+
log.info("Task #%s: Analysis %s (id=%s, ip=%s)", self.task_id, completed_as, self.vmid, self.ipaddr)
397397
self.set_status_in_db("complete")
398398
return
399399
elif status["status"] == "exception":

lib/cuckoo/core/machinery_manager.py

+12-6
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@ def __init__(self):
149149
self.machinery_name: str = self.cfg.cuckoo.machinery
150150
self.machinery: Machinery = self.create_machinery()
151151
self.pool_scaling_lock = threading.Lock()
152+
self.machines_limit: Optional[int] = None
152153
if self.machinery.module_name != self.machinery_name:
153154
raise CuckooCriticalError(
154155
f"Incorrect machinery module was imported. "
@@ -179,15 +180,14 @@ def create_machine_lock(self) -> MachineryLockType:
179180
# If the user wants to use the scaling bounded semaphore, check what machinery is specified, and then
180181
# grab the required configuration key for setting the upper limit
181182
machinery_opts = self.machinery.options.get(self.machinery_name)
182-
machines_limit: int = 0
183183
if self.machinery_name == "az":
184-
machines_limit = machinery_opts.get("total_machines_limit")
184+
self.machines_limit = machinery_opts.get("total_machines_limit")
185185
elif self.machinery_name == "aws":
186-
machines_limit = machinery_opts.get("dynamic_machines_limit")
187-
if machines_limit:
186+
self.machines_limit = machinery_opts.get("dynamic_machines_limit")
187+
if self.machines_limit:
188188
# The ScalingBoundedSemaphore is used to keep feeding available machines from the pending tasks queue
189-
log.info("upper limit for ScalingBoundedSemaphore = %d", machines_limit)
190-
retval = ScalingBoundedSemaphore(value=len(self.machinery.machines()), upper_limit=machines_limit)
189+
log.info("upper limit for ScalingBoundedSemaphore = %d", self.machines_limit)
190+
retval = ScalingBoundedSemaphore(value=len(self.machinery.machines()), upper_limit=self.machines_limit)
191191
else:
192192
log.warning(
193193
"scaling_semaphore is set but the %s machinery does not set the machines limit. Ignoring scaling semaphore.",
@@ -295,6 +295,12 @@ def scale_pool(self, machine: Machine) -> None:
295295
self.machinery.scale_pool(machine)
296296

297297
def start_machine(self, machine: Machine) -> None:
298+
if (
299+
isinstance(self.machine_lock, ScalingBoundedSemaphore)
300+
and self.db.count_machines_running() <= self.machines_limit
301+
and self.machine_lock._value == 0
302+
):
303+
self.machine_lock.release()
298304
with self.machine_lock:
299305
self.machinery.start(machine.label)
300306

modules/machinery/az.py

+22-7
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
CuckooMachineError,
3434
CuckooOperationalError,
3535
)
36-
from lib.cuckoo.core.database import TASK_PENDING
36+
from lib.cuckoo.core.database import TASK_PENDING, Machine
3737

3838
# Only log INFO or higher from imported python packages
3939
logging.getLogger("adal-python").setLevel(logging.INFO)
@@ -104,16 +104,23 @@ class Azure(Machinery):
104104
WINDOWS_PLATFORM = "windows"
105105
LINUX_PLATFORM = "linux"
106106

107+
def set_options(self, options: dict) -> None:
108+
"""Set machine manager options.
109+
@param options: machine manager options dict.
110+
"""
111+
self.options = options
112+
107113
def _initialize(self):
108114
"""
109115
Overloading abstracts.py:_initialize()
110116
Read configuration.
111117
@param module_name: module name
112118
@raise CuckooDependencyError: if there is a problem with the dependencies call
113119
"""
120+
# Using "scale_sets" here instead of "machines" to avoid KeyError
114121
mmanager_opts = self.options.get(self.module_name)
115122
if not isinstance(mmanager_opts["scale_sets"], list):
116-
mmanager_opts["scale_sets"] = mmanager_opts["scale_sets"].strip().split(",")
123+
mmanager_opts["scale_sets"] = str(mmanager_opts["scale_sets"]).strip().split(",")
117124

118125
# Replace a list of IDs with dictionary representations
119126
scale_sets = mmanager_opts.pop("scale_sets")
@@ -238,7 +245,7 @@ def _thr_machine_pool_monitor(self):
238245
threading.Thread(target=self._thr_scale_machine_pool, args=(vals["tag"],)).start()
239246

240247
# Check the machine pools every 5 minutes
241-
threading.Timer(300, self._thr_machine_pool_monitor).start()
248+
threading.Timer(self.options.az.monitor_rate, self._thr_machine_pool_monitor).start()
242249

243250
def _set_vmss_stage(self):
244251
"""
@@ -461,8 +468,13 @@ def stop(self, label):
461468
time.sleep(5)
462469
with reimage_lock:
463470
label_in_reimage_vm_list = label in [f"{vm['vmss']}_{vm['id']}" for vm in reimage_vm_list]
471+
472+
def release(self, machine: Machine):
473+
vmss_name = machine.label.split("_")[0]
474+
if machine_pools[vmss_name]["is_scaling_down"]:
475+
self.delete_machine(machine.label)
464476
else:
465-
self.delete_machine(label)
477+
_ = super(Azure, self).release(machine)
466478

467479
def availables(self, label=None, platform=None, tags=None, arch=None, include_reserved=False, os_version=[]):
468480
"""
@@ -619,7 +631,7 @@ def delete_machine(self, label, delete_from_vmss=True):
619631
"""
620632
global vms_currently_being_deleted
621633

622-
_ = super(Azure, self).delete_machine(label)
634+
super(Azure, self).delete_machine(label)
623635

624636
if delete_from_vmss:
625637
vmss_name, instance_id = label.split("_")
@@ -716,7 +728,10 @@ def _thr_create_vmss(self, vmss_name, vmss_image_ref, vmss_image_os):
716728
managed_disk=vmss_managed_disk,
717729
# Ephemeral disk time
718730
caching="ReadOnly",
719-
diff_disk_settings=models.DiffDiskSettings(option="Local"),
731+
diff_disk_settings=models.DiffDiskSettings(
732+
option="Local",
733+
placement=self.options.az.ephemeral_os_disk_placement
734+
),
720735
)
721736
vmss_storage_profile = models.VirtualMachineScaleSetStorageProfile(
722737
image_reference=vmss_image_ref,
@@ -1110,7 +1125,7 @@ def _get_relevant_machines(self, tag):
11101125
"""
11111126
# The number of relevant machines are those from the list of locked and unlocked machines
11121127
# that have the correct tag in their name
1113-
return [machine for machine in self.db.list_machines() if tag in machine.label]
1128+
return [machine for machine in self.db.list_machines([tag])]
11141129

11151130
@staticmethod
11161131
def _wait_for_concurrent_operations_to_complete():

web/submission/views.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -537,15 +537,16 @@ def index(request, task_id=None, resubmit_hash=None):
537537
# load multi machinery tags:
538538
# Get enabled machinery
539539
machinery = cfg.cuckoo.get("machinery")
540+
machinery_tags = "scale_sets" if machinery == "az" else "machines"
540541
if machinery == "multi":
541542
for mmachinery in Config(machinery).multi.get("machinery").split(","):
542-
vms = [x.strip() for x in getattr(Config(mmachinery), mmachinery).get("machines").split(",") if x.strip()]
543+
vms = [x.strip() for x in getattr(Config(mmachinery), mmachinery).get(machinery_tags).split(",") if x.strip()]
543544
if any(["tags" in list(getattr(Config(mmachinery), vmtag).keys()) for vmtag in vms]):
544545
enabledconf["tags"] = True
545546
break
546547
else:
547548
# Get VM names for machinery config elements
548-
vms = [x.strip() for x in str(getattr(Config(machinery), machinery).get("machines")).split(",") if x.strip()]
549+
vms = [x.strip() for x in str(getattr(Config(machinery), machinery).get(machinery_tags)).split(",") if x.strip()]
549550
# Check each VM config element for tags
550551
if any(["tags" in list(getattr(Config(machinery), vmtag).keys()) for vmtag in vms]):
551552
enabledconf["tags"] = True

0 commit comments

Comments
 (0)