Skip to content

Commit

Permalink
Merge pull request #338 from CybercentreCanada/persistent-service-update
Browse files Browse the repository at this point in the history
Persistent service update (dev)
  • Loading branch information
cccs-rs authored Aug 30, 2021
2 parents 7d0ec60 + a70633e commit e2ac5e3
Show file tree
Hide file tree
Showing 21 changed files with 213 additions and 40 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,4 @@ venv.bak/

# Cython debug symbols
cython_debug/
assemblyline/common/frequency.c
37 changes: 30 additions & 7 deletions assemblyline/cachestore/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@

import re
from typing import AnyStr

from assemblyline.common import forge
from assemblyline.common.isotime import now_as_iso
Expand All @@ -10,9 +11,9 @@


class CacheStore(object):
def __init__(self, component, config=None, datastore=None):
def __init__(self, component: str, config=None, datastore=None):
if not component:
raise ValueError("Cannot instanciate a cachestore without providing a component name.")
raise ValueError("Cannot instantiate a cachestore without providing a component name.")

if not COMPONENT_VALIDATOR.match(component):
raise ValueError("Invalid component name. (Only letters, numbers, underscores and dots allowed)")
Expand All @@ -24,13 +25,13 @@ def __init__(self, component, config=None, datastore=None):
self.datastore = datastore or forge.get_datastore(config=config)
self.filestore = FileStore(*config.filestore.cache)

def __enter__(self):
def __enter__(self) -> 'CacheStore':
return self

def __exit__(self, ex_type, exc_val, exc_tb):
self.filestore.close()

def save(self, cache_key, data, ttl=DEFAULT_CACHE_LEN, force=False):
def save(self, cache_key: str, data: AnyStr, ttl=DEFAULT_CACHE_LEN, force=False):
if not COMPONENT_VALIDATOR.match(cache_key):
raise ValueError("Invalid cache_key for cache item. "
"(Only letters, numbers, underscores and dots allowed)")
Expand All @@ -40,17 +41,39 @@ def save(self, cache_key, data, ttl=DEFAULT_CACHE_LEN, force=False):
self.datastore.cached_file.save(new_key, {'expiry_ts': now_as_iso(ttl), 'component': self.component})
self.filestore.put(new_key, data, force=force)

def get(self, cache_key):
def upload(self, cache_key: str, path: str, ttl=DEFAULT_CACHE_LEN):
if not COMPONENT_VALIDATOR.match(cache_key):
raise ValueError("Invalid cache_key for cache item. "
"(Only letters, numbers, underscores and dots allowed)")

new_key = f"{self.component}_{cache_key}" if self.component else cache_key

self.datastore.cached_file.save(new_key, {'expiry_ts': now_as_iso(ttl), 'component': self.component})
self.filestore.upload(new_key, path, force=True)

def touch(self, cache_key: str, ttl=DEFAULT_CACHE_LEN):
if not COMPONENT_VALIDATOR.match(cache_key):
raise ValueError("Invalid cache_key for cache item. "
"(Only letters, numbers, underscores and dots allowed)")
if not self.exists(cache_key):
raise KeyError(cache_key)

new_key = f"{self.component}_{cache_key}" if self.component else cache_key
self.datastore.cached_file.save(new_key, {'expiry_ts': now_as_iso(ttl), 'component': self.component})

def get(self, cache_key: str) -> bytes:
new_key = f"{self.component}_{cache_key}" if self.component else cache_key
return self.filestore.get(new_key)

def exists(self, cache_key):
def download(self, cache_key: str, path: str):
new_key = f"{self.component}_{cache_key}" if self.component else cache_key
return self.filestore.download(new_key, path)

def exists(self, cache_key: str):
new_key = f"{self.component}_{cache_key}" if self.component else cache_key
return self.filestore.exists(new_key)

def delete(self, cache_key, db_delete=True):
def delete(self, cache_key: str, db_delete=True):
new_key = f"{self.component}_{cache_key}" if self.component else cache_key

self.filestore.delete(new_key)
Expand Down
6 changes: 3 additions & 3 deletions assemblyline/datastore/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -916,17 +916,17 @@ def get_attack_matrix_from_keys(self, keys):
return out

@elasticapm.capture_span(span_type='datastore')
def get_service_with_delta(self, service_name, version=None, as_obj=True):
def get_service_with_delta(self, service_name, version=None, as_obj=True) -> Union[Service, dict, None]:
svc = self.ds.service_delta.get(service_name)
if svc is None:
return svc
return None

if version is not None:
svc.version = version

svc_version_data = self.ds.service.get(f"{service_name}_{svc.version}")
if svc_version_data is None:
return svc_version_data
return None

svc_version_data = recursive_update(svc_version_data.as_primitives(strip_null=True),
svc.as_primitives(strip_null=True))
Expand Down
19 changes: 12 additions & 7 deletions assemblyline/filestore/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from __future__ import annotations

import json
import logging
from typing import AnyStr, TYPE_CHECKING, List, Tuple
from urllib.parse import urlparse, parse_qs, unquote

import elasticapm
Expand All @@ -13,6 +15,9 @@
from assemblyline.filestore.transport.s3 import TransportS3
from assemblyline.filestore.transport.sftp import TransportSFTP

if TYPE_CHECKING:
from assemblyline.filestore.transport.base import Transport


class FileStoreException(Exception):
pass
Expand Down Expand Up @@ -163,7 +168,7 @@ def close(self):
self.log.warning('Transport problem: %s', trace)

@elasticapm.capture_span(span_type='filestore')
def delete(self, path, location='all'):
def delete(self, path: str, location='all'):
with elasticapm.capture_span(name='delete', span_type='filestore', labels={'path': path}):
for t in self.slice(location):
try:
Expand All @@ -173,7 +178,7 @@ def delete(self, path, location='all'):
self.log.info('Transport problem: %s', trace)

@elasticapm.capture_span(span_type='filestore')
def download(self, src_path, dest_path, location='any'):
def download(self, src_path: str, dest_path: str, location='any'):
successful = False
transports = []
download_errors = []
Expand All @@ -191,7 +196,7 @@ def download(self, src_path, dest_path, location='any'):
return transports

@elasticapm.capture_span(span_type='filestore')
def exists(self, path, location='any'):
def exists(self, path, location='any') -> List[Transport]:
transports = []
for t in self.slice(location):
try:
Expand All @@ -205,7 +210,7 @@ def exists(self, path, location='any'):
return transports

@elasticapm.capture_span(span_type='filestore')
def get(self, path, location='any'):
def get(self, path: str, location='any') -> bytes:
for t in self.slice(location):
try:
if t.exists(path):
Expand All @@ -215,7 +220,7 @@ def get(self, path, location='any'):
self.log.warning('Transport problem: %s', trace)

@elasticapm.capture_span(span_type='filestore')
def put(self, dst_path, content, location='all', force=False):
def put(self, dst_path: str, content: AnyStr, location='all', force=False) -> List[Transport]:
transports = []
for t in self.slice(location):
if force or not t.exists(dst_path):
Expand All @@ -239,7 +244,7 @@ def slice(self, location):
return transports

@elasticapm.capture_span(span_type='filestore')
def upload(self, src_path, dst_path, location='all', force=False):
def upload(self, src_path: str, dst_path: str, location='all', force=False) -> List[Transport]:
transports = []
for t in self.slice(location):
if force or not t.exists(dst_path):
Expand All @@ -251,7 +256,7 @@ def upload(self, src_path, dst_path, location='all', force=False):
return transports

@elasticapm.capture_span(span_type='filestore')
def upload_batch(self, local_remote_tuples, location='all'):
def upload_batch(self, local_remote_tuples, location='all') -> List[Tuple[str, str, str]]:
failed_tuples = []
for (src_path, dst_path) in local_remote_tuples:
try:
Expand Down
2 changes: 1 addition & 1 deletion assemblyline/filestore/transport/azure.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ def upload(self, src_path, dst_path):
raise

# Buffer based functions
def get(self, path):
def get(self, path: str) -> bytes:
key = self.normalize(path)
my_blob = BytesIO()

Expand Down
2 changes: 1 addition & 1 deletion assemblyline/filestore/transport/ftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ def upload(self, src_path: str, dst_path: str):

# Buffer based functions
@reconnect_retry_on_fail
def get(self, path) -> bytes:
def get(self, path: str) -> bytes:
path = self.normalize(path)
bio = BytesIO()
self.ftp.retrbinary('RETR ' + path, bio.write)
Expand Down
2 changes: 1 addition & 1 deletion assemblyline/filestore/transport/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def upload_batch(self, local_remote_tuples):
raise TransportException("READ ONLY TRANSPORT: Method not implemented")

# Buffer based functions
def get(self, path):
def get(self, path: str) -> bytes:
path = self.normalize(path)
resp = self.session.get(path, auth=self.auth, cert=self.pki, verify=self.verify)
if resp.ok:
Expand Down
2 changes: 1 addition & 1 deletion assemblyline/filestore/transport/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def upload(self, src_path, dst_path):
assert (self.exists(dst_path))

# Buffer based functions
def get(self, path):
def get(self, path: str) -> bytes:
path = self.normalize(path)
fh = None
try:
Expand Down
2 changes: 1 addition & 1 deletion assemblyline/filestore/transport/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ def upload(self, src_path, dst_path):
self.with_retries(self.client.upload_file, src_path, self.bucket, dst_path)

# Buffer based functions
def get(self, path):
def get(self, path: str) -> bytes:
fd, dst_path = tempfile.mkstemp(prefix="s3_transport.", suffix=".download")
os.close(fd) # We don't need the file descriptor open

Expand Down
2 changes: 1 addition & 1 deletion assemblyline/filestore/transport/sftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ def upload(self, src_path, dst_path):

# Buffer based functions
@reconnect_retry_on_fail
def get(self, path):
def get(self, path: str) -> bytes:
path = self.normalize(path)
bio = BytesIO()
with self.sftp.open(path) as sftp_handle:
Expand Down
3 changes: 3 additions & 0 deletions assemblyline/odm/models/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -703,6 +703,8 @@ class Services(odm.Model):
stages: List[str] = odm.List(odm.Keyword())
# Substitution variables for image paths (for custom registry support)
image_variables: Dict[str, str] = odm.Mapping(odm.Keyword(default=''))
# Same as above, but only applied in the updater, used in dev setups and local registries
update_image_variables: Dict[str, str] = odm.Mapping(odm.Keyword(default=''))
# Default update channel to be used for new services
preferred_update_channel: str = odm.Keyword()
# Allow container registries with self signed certs for service updates
Expand All @@ -720,6 +722,7 @@ class Services(odm.Model):
"min_service_workers": 0,
"stages": SERVICE_STAGES,
"image_variables": {},
"update_image_variables": {},
"preferred_update_channel": "stable",
"allow_insecure_registry": False,
"cpu_reservation": 0.25
Expand Down
33 changes: 17 additions & 16 deletions assemblyline/odm/models/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@

@odm.model(index=False, store=False)
class EnvironmentVariable(odm.Model):
name = odm.Keyword()
value = odm.Keyword()
name: str = odm.Keyword()
value: str = odm.Keyword()


@odm.model(index=False, store=False)
Expand Down Expand Up @@ -39,20 +39,21 @@ class PersistentVolume(odm.Model):
class DependencyConfig(odm.Model):
container = odm.Compound(DockerConfig)
volumes = odm.Mapping(odm.Compound(PersistentVolume), default={})
run_as_core: bool = odm.Boolean(default=False)


@odm.model(index=False, store=False)
class UpdateSource(odm.Model):
name = odm.Keyword()
password = odm.Optional(odm.Keyword(default=""))
pattern = odm.Optional(odm.Keyword(default=""))
private_key = odm.Optional(odm.Keyword(default=""))
ca_cert = odm.Optional(odm.Keyword(default=""))
ssl_ignore_errors = odm.Boolean(default=False)
proxy = odm.Optional(odm.Keyword(default=""))
uri = odm.Keyword()
username = odm.Optional(odm.Keyword(default=""))
headers = odm.List(odm.Compound(EnvironmentVariable), default=[])
name: str = odm.Keyword()
password: Opt[str] = odm.Optional(odm.Keyword(default=""))
pattern: Opt[str] = odm.Optional(odm.Keyword(default=""))
private_key: Opt[str] = odm.Optional(odm.Keyword(default=""))
ca_cert: Opt[str] = odm.Optional(odm.Keyword(default=""))
ssl_ignore_errors: bool = odm.Boolean(default=False)
proxy: Opt[str] = odm.Optional(odm.Keyword(default=""))
uri: str = odm.Keyword()
username: Opt[str] = odm.Optional(odm.Keyword(default=""))
headers: list[EnvironmentVariable] = odm.List(odm.Compound(EnvironmentVariable), default=[])
default_classification = odm.Classification(default=Classification.UNRESTRICTED)


Expand All @@ -62,9 +63,9 @@ class UpdateConfig(odm.Model):
generates_signatures = odm.Boolean(index=True, default=False)
method = odm.Enum(values=['run', 'build']) # Are we going to run or build a container?
run_options = odm.Optional(odm.Compound(DockerConfig)) # If we are going to run a container, which one?
sources = odm.List(odm.Compound(UpdateSource), default=[]) # Generic external resources we need
update_interval_seconds = odm.Integer() # Update check interval in seconds
wait_for_update = odm.Boolean(default=False)
sources: list[UpdateSource] = odm.List(odm.Compound(UpdateSource), default=[]) # Generic external resources we need
update_interval_seconds: int = odm.Integer() # Update check interval in seconds
wait_for_update: bool = odm.Boolean(default=False)


@odm.model(index=False, store=False)
Expand All @@ -91,7 +92,7 @@ class Service(odm.Model):
is_external = odm.Boolean(default=False)
licence_count = odm.Integer(default=0)

name = odm.Keyword(store=True, copyto="__text__")
name: str = odm.Keyword(store=True, copyto="__text__")
version = odm.Keyword(store=True)

# Should the result cache be disabled for this service
Expand Down
1 change: 1 addition & 0 deletions assemblyline/odm/models/service_delta.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class PersistentVolumeDelta(odm.Model):
class DependencyConfigDelta(odm.Model):
container = odm.Optional(odm.Compound(DockerConfigDelta))
volumes = odm.Mapping(odm.Compound(PersistentVolumeDelta), default={})
run_as_core: bool = odm.Optional(odm.Boolean())


@odm.model(index=False, store=False)
Expand Down
32 changes: 32 additions & 0 deletions dev/k8s/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Assemblyline Dev Setup (Kubernetes)
- Follow steps in [K8S appliance](https://github.com/CybercentreCanada/assemblyline-helm-chart/tree/master/appliance) for local Kubernetes setup
- Enable registry add-on for microK8S (other registries can be used like Harbor but involves more setup which isn't covered here)
- Test: curl localhost:32000/v2/_catalog
- When ready to build, run local_dev_containers.sh script with tag as parameter.
- Run helm install|upgrade using new tags in values.yaml.
- Use Lens or kubectl to monitor status of deployment
- You can create local service-base images by passing an optional build-arg on a docker build command otherwise will pull latest.
- ie. docker build . -f service-base.Dockerfile --build-arg build_no=dev0
- Debugging: Visual Code's [Bridge to Kubernetes](https://marketplace.visualstudio.com/items?itemName=ms-azuretools.mindaro) &
[Kubernetes](https://marketplace.visualstudio.com/items?itemName=ms-kubernetes-tools.vscode-kubernetes-tools) extensions
- Add the following to settings.json (assuming using microk8s installed from snap):
```
"vs-kubernetes": {
"vs-kubernetes.namespace": "al",
"vs-kubernetes.kubectl-path": "/snap/kubectl/current/kubectl",
"vs-kubernetes.helm-path": "/snap/helm/current/helm",
"vs-kubernetes.minikube-path": "/snap/bin/microk8s",
"vs-kubernetes.kubectlVersioning": "user-provided",
"vs-kubernetes.outputFormat": "yaml",
"vs-kubernetes.kubeconfig": "/var/snap/microk8s/current/credentials/client.config",
"vs-kubernetes.knownKubeconfigs": [],
"vs-kubernetes.autoCleanupOnDebugTerminate": false,
"vs-kubernetes.nodejs-autodetect-remote-root": true,
"vs-kubernetes.nodejs-remote-root": "",
"vs-kubernetes.nodejs-debug-port": 9229,
"vs-kubernetes.local-tunnel-debug-provider": "",
"checkForMinikubeUpgrade": false,
"imageBuildTool": "Docker"
}
```
- Specific to Updater/Scaler: You need to provide an environment variable in your launch targets called 'KUBECONFIG' that points to where your kubeconfig file is.
Loading

0 comments on commit e2ac5e3

Please sign in to comment.