Skip to content

Commit

Permalink
Merge pull request #262 from CybercentreCanada/configuration-notifica…
Browse files Browse the repository at this point in the history
…tions

Configuration notifications
  • Loading branch information
cccs-rs authored Sep 13, 2021
2 parents e4f293a + 1c8de2b commit cab5199
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 6 deletions.
40 changes: 39 additions & 1 deletion assemblyline_ui/api/v4/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
from assemblyline.common.dict_utils import get_recursive_delta
from assemblyline.odm.models.heuristic import Heuristic
from assemblyline.odm.models.service import Service
from assemblyline.odm.messages.changes import Operation
from assemblyline.remote.datatypes import get_client
from assemblyline.remote.datatypes.events import EventSender
from assemblyline.remote.datatypes.hash import Hash
from assemblyline_core.updater.helper import get_latest_tag_for_service
from assemblyline_ui.api.base import api_login, make_api_response, make_file_response, make_subapi_blueprint
Expand All @@ -32,6 +34,10 @@
private=False,
))

event_sender = EventSender('changes.services',
host=config.core.redis.nonpersistent.host,
port=config.core.redis.nonpersistent.port)


def check_private_keys(source_list):
# Check format of private_key(if any) in sources
Expand Down Expand Up @@ -148,6 +154,12 @@ def add_service(**_):
if not STORAGE.service_delta.get_if_exists(service.name):
STORAGE.service_delta.save(service.name, {'version': service.version})
STORAGE.service_delta.commit()

# Notify components watching for service config changes
event_sender.send(service.name, {
'operation': Operation.Added,
'name': service.name
})

new_heuristics = []
if heuristics:
Expand Down Expand Up @@ -258,6 +270,12 @@ def restore(**_):
STORAGE.service.save(v_id, v_data)
STORAGE.service_delta.save(service_name, service['config'])

# Notify components watching for service config changes
event_sender.send(service_name, {
'operation': Operation.Added if old_service else Operation.Removed,
'name': service_name
})

# Grab the new value for the service
new_service = STORAGE.get_service_with_delta(service_name, as_obj=False)

Expand Down Expand Up @@ -494,6 +512,13 @@ def remove_service(servicename, **_):
if not STORAGE.service.delete_by_query(f"id:{servicename}*"):
success = False
STORAGE.heuristic.delete_by_query(f"{servicename.upper()}*")

# Notify components watching for service config changes
event_sender.send(servicename, {
'operation': Operation.Removed,
'name': servicename
})

return make_api_response({"success": success})
else:
return make_api_response({"success": False},
Expand Down Expand Up @@ -570,7 +595,16 @@ def set_service(servicename, **_):
c_srcs = STORAGE.get_service_with_delta(servicename, as_obj=False).get('update_config', {}).get('sources', [])
removed_sources = synchronize_sources(servicename, c_srcs, delta["update_config"]["sources"])

return make_api_response({"success": STORAGE.service_delta.save(servicename, delta),
# Notify components watching for service config changes
success = STORAGE.service_delta.save(servicename, delta)

if success:
event_sender.send(servicename, {
'operation': Operation.Modified,
'name': servicename
})

return make_api_response({"success": success,
"removed_sources": removed_sources})


Expand Down Expand Up @@ -605,6 +639,10 @@ def update_service(**_):
operations = [(STORAGE.service_delta.UPDATE_SET, 'version',
data['update_data']['latest_tag'].replace('stable', ''))]
if STORAGE.service_delta.update(data['name'], operations):
event_sender.send(data['name'], {
'operation': Operation.Modified,
'name': data['name']
})
return make_api_response({'success': True, 'status': "updated"})

service_update.set(data['name'], data['update_data'])
Expand Down
63 changes: 58 additions & 5 deletions assemblyline_ui/api/v4/signature.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
from assemblyline.common import forge
from assemblyline.common.isotime import iso_to_epoch, now_as_iso
from assemblyline.common.memory_zip import InMemoryZip
from assemblyline.odm.messages.changes import Operation
from assemblyline.odm.models.signature import DEPLOYED_STATUSES, STALE_STATUSES, DRAFT_STATUSES
from assemblyline.remote.datatypes import get_client
from assemblyline.remote.datatypes.hash import Hash
from assemblyline.remote.datatypes.lock import Lock
from assemblyline.remote.datatypes.events import EventSender
from assemblyline_ui.api.base import api_login, make_api_response, make_file_response, make_subapi_blueprint
from assemblyline_ui.config import LOGGER, STORAGE

Expand All @@ -20,6 +22,13 @@

DEFAULT_CACHE_TTL = 24 * 60 * 60 # 1 Day

event_sender = EventSender('changes.signatures',
host=config.core.redis.nonpersistent.host,
port=config.core.redis.nonpersistent.port)
service_event_sender = EventSender('changes.services',
host=config.core.redis.nonpersistent.host,
port=config.core.redis.nonpersistent.port)


def _reset_service_updates(signature_type):
service_updates = Hash('service-updates', get_client(
Expand Down Expand Up @@ -103,7 +112,16 @@ def add_update_signature(**_):
data['stats'] = old['stats']

# Save the signature
return make_api_response({"success": STORAGE.signature.save(key, data), "id": key})
success = STORAGE.signature.save(key, data)
if success:
event_sender.send(data['type'], {
'signature_id': data['signature_id'],
'signature_type': data['type'],
'source': data['source'],
'operation': Operation.Modified if old else Operation.Added
})

return make_api_response({"success": success, "id": key})


@signature_api.route("/add_update_many/", methods=["POST", "PUT"])
Expand Down Expand Up @@ -178,6 +196,14 @@ def add_update_many_signature(**_):

if not plan.empty:
res = STORAGE.signature.bulk(plan)

event_sender.send(sig_type, {
'signature_id': '*',
'signature_type': sig_type,
'source': source,
'operation': Operation.Modified
})

return make_api_response({"success": len(res['items']), "errors": res['errors'], "skipped": skip_list})

return make_api_response({"success": 0, "errors": [], "skipped": skip_list})
Expand Down Expand Up @@ -249,7 +275,13 @@ def add_signature_source(service, **_):
_reset_service_updates(service)

# Save the signature
return make_api_response({"success": STORAGE.service_delta.save(service, service_delta)})
success = STORAGE.service_delta.save(service, service_delta)
if success:
service_event_sender.send(data['name'], {
'operation': Operation.Modified,
'name': data['name']
})
return make_api_response({"success": success})


# noinspection PyPep8Naming
Expand Down Expand Up @@ -318,7 +350,14 @@ def change_status(sid, status, **kwargs):

_reset_service_updates(data['type'])

return make_api_response({"success": STORAGE.signature.update(sid, operations)})
success = STORAGE.signature.update(sid, operations)
event_sender.send(data['type'], {
'signature_id': sid,
'signature_type': data['type'],
'source': data['source'],
'operation': Operation.Modified
})
return make_api_response({"success": success})
else:
return make_api_response("", f"Signature not found. ({sid})", 404)

Expand Down Expand Up @@ -351,6 +390,12 @@ def delete_signature(sid, **kwargs):
ret_val = STORAGE.signature.delete(sid)

_reset_service_updates(data['type'])
event_sender.send(data['type'], {
'signature_id': sid,
'signature_type': data['type'],
'source': data['source'],
'operation': Operation.Removed
})
return make_api_response({"success": ret_val})
else:
return make_api_response("", f"Signature not found. ({sid})", 404)
Expand Down Expand Up @@ -415,6 +460,10 @@ def delete_signature_source(service, name, **_):

_reset_service_updates(service)

service_event_sender.send(service, {
'operation': Operation.Modified,
'name': service
})
return make_api_response({"success": success})


Expand Down Expand Up @@ -655,9 +704,13 @@ def update_signature_source(service, name, **_):
operations=[("SET", "classification", class_norm)])

_reset_service_updates(service)

# Save the signature
return make_api_response({"success": STORAGE.service_delta.save(service, service_delta)})
success = STORAGE.service_delta.save(service, service_delta)
service_event_sender.send(service, {
'operation': Operation.Modified,
'name': service
})
return make_api_response({"success": success})


@signature_api.route("/stats/", methods=["GET"])
Expand Down

0 comments on commit cab5199

Please sign in to comment.