Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Umbrella.py #1330

Merged
merged 1 commit into from
Feb 27, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 59 additions & 24 deletions analyzers/Umbrella/Umbrella.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# encoding: utf-8
import json
import requests
from base64 import b64encode
from cortexutils.analyzer import Analyzer

class UmbrellaAnalyzer(Analyzer):
Expand All @@ -12,45 +13,79 @@ def __init__(self):
self.api_secret = self.get_param('config.api_secret', None, 'api_secret is missing')
self.organization_id = self.get_param('config.organization_id', None, 'organization_id is missing')
self.query_limit = str(self.get_param('config.query_limit', 20))
self.token = None

def umbrella_runreport(self, destination):
base_url = "https://reports.api.umbrella.com/v1/organizations"
url = "{}/{}/destinations/{}/activity?limit={}".format(base_url,self.organization_id,destination,self.query_limit)
try:
r = requests.get(url, auth=(self.api_key, self.api_secret))
if r.status_code == 200:
return json.loads(r.text)
else:
self.error('API query failed. Check parameters.')
except Exception as e:
self.unexpectedError(e)
token = self.get_bearer_token()
headers = {
'Authorization': f'Bearer {self.token}',
'Content-Type': 'application/json'
}

report_url = f"https://reports.api.umbrella.com/v2/organizations/{self.organization_id}/activity?from=-7days&to=now&domains={destination}&limit={self.query_limit}"

response = requests.get(report_url, headers=headers)
print(response)
if response.status_code == 200:
return json.loads(response.text)
else:
print(f"Failed to get categories: {response.text}")
return None

def get_bearer_token(self):
auth_url = "https://api.umbrella.com/auth/v2/token"
credentials = f"{self.api_key}:{self.api_secret}"
encoded_credentials = b64encode(credentials.encode()).decode()

headers = {
'Authorization': f'Basic {encoded_credentials}',
'Content-Type': 'application/json'
}

response = requests.post(auth_url, headers=headers)
if response.status_code == 200:
token_data = response.json()
self.token = token_data['access_token']
#print(self.token)
return self.token
else:
print(f"Failed to get bearer token: {response.text}")
return None

def summary(self, raw):
taxonomies = []
level = "info"
namespace = "Umbrella"
predicate = "GetReport"
value = "0"

if len(raw['requests']) > 0:
taxonomies.append(self.build_taxonomy(
'info',
'Umbrella',
'Hits',
'True'))
else:
taxonomies.append(self.build_taxonomy(
'info',
'Umbrella',
'Hits',
'False'))
if "data" in raw and len(raw["data"]) > 0:
item = raw["data"][0]
if "verdict" in item:
verdicts = item['verdict']
value = "{}".format(verdicts)

if verdicts.lower() in ["allowed", "passed", "none"]:
level = "safe"
elif verdicts.lower() in ["blocked", "rejected", "failed"]:
level = "malicious"
else:
level = "suspicious"

taxonomies.append(self.build_taxonomy(level, namespace, predicate, value))

return {'taxonomies': taxonomies}



def run(self):
# Map The Hive observable types to Umbrella observable types
observable_mapping = {
"domain": "domain",
"fqdn": "domain",
"fqdn": "domain",
}



if self.service == 'get':
dataType = self.get_param("dataType")

Expand Down