Skip to content

Commit

Permalink
cortex: Add error handling and retry
Browse files Browse the repository at this point in the history
Add error handling for Cortex API. Adopt retrying based on a requests
session retry adapter analogous to the Cuckoo module. With this change
we correctly handle inavailability of the Cortex API at startup, during
submit and when tracking job status. At startup and submit we give up
after a number of retries with increasing backoff and return an error.
When tracking jobs the same happens but the error is ignored and job
status retrieval retried on the next iteration because we have no
alternative than to try to limp on. If unavailability continues, the job
will eventually run into the maximum_job_age timeout and be reported as
failed to the client (if they're even still interested in it and haven't
dropped the connection already).

Requires a current
cortex4py module that allows to supply a custom session object - see
https://github.com/michaelweiser/Cortex4py/tree/session for now.
  • Loading branch information
michaelweiser committed Oct 20, 2020
1 parent 46ce5aa commit 8964236
Showing 1 changed file with 95 additions and 7 deletions.
102 changes: 95 additions & 7 deletions peekaboo/toolbox/cortex.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@

import cortex4py.api
import cortex4py.exceptions
import requests
import urllib3.util.retry

from peekaboo.exceptions import PeekabooException

Expand Down Expand Up @@ -309,11 +311,47 @@ def analyzer(self):
return self.__analyzer


class WhitelistRetry(urllib3.util.retry.Retry):
""" A Retry class which has a status code whitelist, allowing to retry all
requests not whitelisted in a hard-core, catch-all manner. """
def __init__(self, status_whitelist=None, abort=None, **kwargs):
super().__init__(**kwargs)
self.status_whitelist = status_whitelist or set()
# Event that is set if we're not to retry
self.abort = abort

def new(self, **kwargs):
""" Adjusted shallow copy method to carry our parameters over into our
copy. """
if 'status_whitelist' not in kwargs:
kwargs['status_whitelist'] = self.status_whitelist
if 'abort' not in kwargs:
kwargs['abort'] = self.abort
return super().new(**kwargs)

def is_exhausted(self):
""" Allow to abort a retry chain through an external signal. """
if self.abort and self.abort.is_set():
return True

return super().is_exhausted()

def is_retry(self, method, status_code, has_retry_after=False):
""" Override Retry's is_retry to introduce our status whitelist logic.
"""
# we retry all methods so no check if method is retryable here

if self.status_whitelist and status_code not in self.status_whitelist:
return True

return super().is_retry(method, status_code, has_retry_after)


class Cortex:
""" Interfaces with a Cortex installation via its REST API. """
def __init__(self, job_queue, url="http://localhost:9001", api_token="",
poll_interval=5, submit_original_filename=True,
max_job_age=900):
max_job_age=900, retries=5, backoff=0.5):
""" Initialize the object.
@param job_queue: The job queue to use from now on
Expand All @@ -331,6 +369,10 @@ def __init__(self, job_queue, url="http://localhost:9001", api_token="",
@param max_job_age: How long to track jobs before declaring them
failed.
@type max_job_age: int (seconds)
@param retries: Number of retries on API requests
@type retries: int
@param backoff: Backoff factor for urllib3
@type backoff: float
"""
self.job_queue = job_queue
self.shutdown_requested = threading.Event()
Expand All @@ -344,6 +386,35 @@ def __init__(self, job_queue, url="http://localhost:9001", api_token="",
self.submit_original_filename = submit_original_filename
self.max_job_age = max_job_age

# urrlib3 backoff formula:
# <backoff factor> * (2 ^ (<retry count so far> - 1))
# with second try intentionally having no sleep,
# e.g. with retry count==5 and backoff factor==0.5:
# try 1: fail, sleep(0.5*2^(1-1)==0.5*2^0==0.5*1==0.5->intentionally
# overridden to 0)
# try 2: fail, sleep(0.5*2^(2-1)==0.5*2^1==1)
# try 3: fail, sleep(0.5*2^(3-1)==0.5*2^2==2)
# try 4: fail, sleep(0.5*2^(4-1)==0.5*2^3==4)
# try 5: fail, abort, sleep would've been 8 before try 6
#
# Also, use method_whitelist=False to enable POST and other methods for
# retry which aren't by default because they're not considered
# idempotent. We assume that with the REST API a request either
# succeeds or fails without residual effects, making them atomic and
# idempotent.
#
# And finally we retry everything but a 200 response, which admittedly
# is a bit hard-core but serves our purposes for now.
retry_config = WhitelistRetry(total=retries,
backoff_factor=backoff,
method_whitelist=False,
status_whitelist=set([200]),
abort=self.shutdown_requested)
retry_adapter = requests.adapters.HTTPAdapter(max_retries=retry_config)
self.session = requests.session()
self.session.mount('http://', retry_adapter)
self.session.mount('https://', retry_adapter)

self.api = None
self.tracker = None

Expand Down Expand Up @@ -416,8 +487,8 @@ def resubmit_with_analyzer_report(self, job_id):
# register this job's analysis report with our main report object
job.sample.cortex_report.register_report(
job.analyzer, report.report)
# FIXME: More error handling and retrying here
except cortex4py.exceptions.CortexException:
except cortex4py.exceptions.CortexException as error:
logger.error('Retrieval of report from Cortex failed: %s', error)
# mark analysis as failed if we could not get the report e.g.
# because it was corrupted or the API connection failed.
job.sample.mark_cortex_failure()
Expand Down Expand Up @@ -468,7 +539,12 @@ def submit(self, sample, analyzer):

logger.debug("Creating Cortex job with analyzer %s and "
"parameters %s", analyzer.name, params)
job = self.api.analyzers.run_by_name(analyzer.name, params)
try:
job = self.api.analyzers.run_by_name(analyzer.name, params)
except cortex4py.exceptions.CortexException as error:
raise CortexSubmitFailedException(
'Error submitting Cortex job: %s' % error)

self.register_running_job(job.id, CortexJob(sample, job, analyzer))
return job.id

Expand All @@ -478,8 +554,13 @@ def start_tracker(self):
# API to be reachable at startup (with the usual retries to account for
# a bit of a race condition in parallel startup) but later on hope
# that all errors are transient and retry endlessly
self.api = cortex4py.api.Api(self.url, self.api_token)
self.api.analyzers.find_all({}, range='all')
self.api = cortex4py.api.Api(
self.url, self.api_token, session=self.session)
try:
self.api.analyzers.find_all({}, range='all')
except cortex4py.exceptions.CortexException as error:
logger.error('Initial connection to Cortex API failed: %s', error)
return False

self.tracker = threading.Thread(target=self.track,
name="CortexJobTracker")
Expand All @@ -505,7 +586,14 @@ def track(self):
# only meant to iterate over the job list in blocks but not to
# return data about a specific range of job IDs from that list.
for job_id in running_jobs:
cortexjob = self.api.jobs.get_by_id(job_id)
cortexjob = None
try:
cortexjob = self.api.jobs.get_by_id(job_id)
except cortex4py.exceptions.CortexException as error:
logger.error('Querying Cortex job status failed: %s', error)
# ignore and retry on next polling run
continue

if cortexjob.status in ['Success']:
self.resubmit_with_analyzer_report(job_id)
continue
Expand Down

0 comments on commit 8964236

Please sign in to comment.