diff --git a/analyzers/Yara/Yara.json b/analyzers/Yara/Yara.json index 323998152..64aa794a4 100644 --- a/analyzers/Yara/Yara.json +++ b/analyzers/Yara/Yara.json @@ -25,10 +25,18 @@ }, { "name": "github_token", - "description": "PAT (recommended) in case of private repository or high frequency of pulls/executions", + "description": "GitHub Private Access Token", "type": "string", "multi": false, "required": false + }, + { + "name": "files_limit", + "description": "Enforce a limit on the number of YARA files downloaded or tested against the file. Adjust with care as this may impact analysis time and resources on your Cortex instance.", + "type": "number", + "multi": false, + "required": false, + "defaultValue": 400 } ] } diff --git a/analyzers/Yara/yara_analyzer.py b/analyzers/Yara/yara_analyzer.py index e0a059dc8..7293f8d80 100755 --- a/analyzers/Yara/yara_analyzer.py +++ b/analyzers/Yara/yara_analyzer.py @@ -96,15 +96,10 @@ def extract_github_info(url): class YaraAnalyzer(Analyzer): - def download_rules_from_github_url(self, url, token): + def download_rules_from_github_url(self, url, token, limit=None): """ - Download .yar rule files from a GitHub URL formatted like /tree/main[/optional/subdir] - using the GitHub API and a PAT. Throws an error via self.error if the repository cannot - be accessed or if a file download fails. - - :param url: GitHub URL e.g. 'https://github.com/owner/repo/tree/main/subdir' - :param token: Personal Access Token with repo scope - :return: List of paths to the downloaded rule files + Downloads up to 'limit' .yar or .yara files from the given GitHub URL. + If limit is None, downloads all matching files. """ info = extract_github_info(url) if not info: @@ -112,36 +107,41 @@ def download_rules_from_github_url(self, url, token): repo_identifier = info["repo"] branch = info["branch"] - path = info["path"] # May be empty if no subdirectory is specified + directory = info["path"] - downloaded_rule_files = [] - headers = {"Authorization": f"token {token}"} + if directory and not directory.endswith('/'): + directory += '/' - # Build the API URL: if a subdirectory is provided, include it in the URL - api_url = f"https://api.github.com/repos/{repo_identifier}/contents" - if path: - api_url += f"/{path}" - api_url += f"?ref={branch}" + headers = {} + if token: + headers["Authorization"] = f"token {token}" + api_url = f"https://api.github.com/repos/{repo_identifier}/git/trees/{branch}?recursive=1" response = requests.get(api_url, headers=headers) if response.status_code != 200: - self.error(f"Error accessing repository contents: {response.status_code} - {response.text}") - - contents = response.json() - # Normalize to a list if a single file is returned - if isinstance(contents, dict): - contents = [contents] - for item in contents: - if item["name"].endswith(".yar"): - download_url = item.get("download_url") - if download_url: - file_response = requests.get(download_url, headers=headers) - if file_response.status_code != 200: - self.error(f"Error downloading file {item['name']}: {file_response.status_code}") - tmp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".yar") - tmp_file.write(file_response.content) - tmp_file.close() - downloaded_rule_files.append(tmp_file.name) + self.error(f"Error accessing repository tree: {response.status_code} - {response.text}") + + tree = response.json().get("tree", []) + # Iterate and break early + rule_files = [] + for item in tree: + if item["type"] == "blob" and item["path"].startswith(directory) and \ + item["path"].endswith((".yar", ".yara")): + rule_files.append(item["path"]) + if limit is not None and len(rule_files) >= limit: + break + + downloaded_rule_files = [] + for file_path in rule_files: + download_url = f"https://raw.githubusercontent.com/{repo_identifier}/{branch}/{file_path}" + file_response = requests.get(download_url, headers=headers) + if file_response.status_code != 200: + self.error(f"Error downloading file {file_path}: {file_response.status_code}") + ext = os.path.splitext(file_path)[1] + tmp_file = tempfile.NamedTemporaryFile(delete=False, suffix=ext) + tmp_file.write(file_response.content) + tmp_file.close() + downloaded_rule_files.append(tmp_file.name) return downloaded_rule_files @@ -149,75 +149,79 @@ def download_rules_from_github_url(self, url, token): def __init__(self): Analyzer.__init__(self) + # Get local rules self.rulepaths = self.get_param('config.rules', [], 'No paths for rules provided.') if not self.rulepaths: - self.rulepaths = [] # Ensure it's a list even if nothing was provided + self.rulepaths = [] elif isinstance(self.rulepaths, str): self.rulepaths = [self.rulepaths] - - # Filter out any None values from the list - self.rulepaths = [rp for rp in self.rulepaths if rp is not None and rp != ''] + self.rulepaths = [rp for rp in self.rulepaths if rp] - self.github_urls = self.get_param('config.github_urls', None, 'No GitHub URLs provided.') - self.github_token = self.get_param('config.github_token', None, 'No GitHub PAT provided.') + # Get GitHub configuration + self.github_urls = self.get_param('config.github_urls', None) + self.github_token = self.get_param('config.github_token', None) + self.files_limit = self.get_param('config.files_limit', None) - self.ruleset = [] - self.ignored_rules = [] + # Global list of rule files + rule_files = [] + # Add local rule files (both .yar and .yara) for rulepath in self.rulepaths: - if os.path.isfile(rulepath) and rulepath.endswith('.yar'): - try: - compiled_ruleset = yara.compile(filepath=rulepath) - rule_names = extract_rule_names_from_file(rulepath) - self.ruleset.append({ - "compiled": compiled_ruleset, - "rule_names": rule_names, - "source": rulepath - }) - except (yara.SyntaxError, yara.Error, Exception) as e: - error_msg = f"Failed to load YARA rule file {rulepath} - {str(e)}" - print(f"Warning: {error_msg}") - self.ignored_rules.append({"source": rulepath, "error": str(e)}) - + if os.path.isfile(rulepath) and rulepath.endswith((".yar", ".yara")): + rule_files.append(rulepath) elif os.path.isdir(rulepath): - rule_files = [os.path.join(rulepath, f) for f in os.listdir(rulepath) if f.endswith('.yar')] - - if rule_files: - for rule_file in rule_files: - try: - compiled_ruleset = yara.compile(filepath=rule_file) - rule_names = extract_rule_names_from_file(rule_file) - self.ruleset.append({ - "compiled": compiled_ruleset, - "rule_names": rule_names, - "source": rule_file - }) - except (yara.SyntaxError, yara.Error, Exception) as e: - error_msg = f"Failed to load YARA rule file {rule_file} - {str(e)}" - print(f"Warning: {error_msg}") - self.ignored_rules.append({"source": rule_file, "error": str(e)}) - else: - print(f"Warning: No .yar files found in directory {rulepath}") + local_files = [os.path.join(rulepath, f) + for f in os.listdir(rulepath) + if f.endswith((".yar", ".yara"))] + rule_files.extend(local_files) + else: + print(f"Warning: {rulepath} is not a valid file or directory.") + + # Convert files_limit to integer if provided + global_limit = None + if self.files_limit: + try: + global_limit = int(self.files_limit) + except ValueError: + self.error("Invalid files_limit value; it should be an integer.") + + # If a global limit is set, take only as many local files as needed + if global_limit is not None and len(rule_files) >= global_limit: + rule_files = rule_files[:global_limit] + remaining = 0 + else: + remaining = None if global_limit is None else global_limit - len(rule_files) + # For each GitHub URL, download only until limit is reached if self.github_urls and self.github_token: for url in self.github_urls: - github_rule_files = self.download_rules_from_github_url(url, self.github_token) - for rule_file in github_rule_files: - try: - compiled_ruleset = yara.compile(filepath=rule_file) - rule_names = extract_rule_names_from_file(rule_file) - self.ruleset.append({ - "compiled": compiled_ruleset, - "rule_names": rule_names, - "source": rule_file - }) - except (yara.SyntaxError, yara.Error, Exception) as e: - error_msg = f"Failed to load YARA rule file {rule_file} - {str(e)}" - print(f"Warning: {error_msg}") - self.ignored_rules.append({"source": rule_file, "error": str(e)}) + if remaining is not None and remaining <= 0: + break + github_files = self.download_rules_from_github_url(url, self.github_token, limit=remaining) + rule_files.extend(github_files) + if remaining is not None: + remaining = global_limit - len(rule_files) + + # Compile the collected rule files + self.ruleset = [] + self.ignored_rules = [] + for rule_file in rule_files: + try: + compiled_ruleset = yara.compile(filepath=rule_file) + rule_names = extract_rule_names_from_file(rule_file) + self.ruleset.append({ + "compiled": compiled_ruleset, + "rule_names": rule_names, + "source": rule_file + }) + except (yara.SyntaxError, yara.Error, Exception) as e: + error_msg = f"Failed to load YARA rule file {rule_file} - {str(e)}" + print(f"Warning: {error_msg}") + self.ignored_rules.append({"source": rule_file, "error": str(e)}) if not self.ruleset: print("Warning: No valid YARA rules were loaded.") + def check(self, file_path): @@ -323,11 +327,12 @@ def run(self): "rules_tested": sum(len(rule_obj.get("rule_names", [])) for rule_obj in self.ruleset), "rulenames": rule_names, "total_yar_files": len(self.ruleset), - "ignored_rules": self.ignored_rules + "ignored_rules": self.ignored_rules, + "files_limit": self.files_limit } self.report(output) else: - self.error('Wrong data type.') + self.error('Wrong data type. Expected file') if __name__ == '__main__': diff --git a/thehive-templates/Yara_3_0/long.html b/thehive-templates/Yara_3_0/long.html index 0c8c63a0d..228813705 100644 --- a/thehive-templates/Yara_3_0/long.html +++ b/thehive-templates/Yara_3_0/long.html @@ -1,6 +1,6 @@