Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Persistent service update (dev) #338

Merged
merged 24 commits into from
Aug 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
51578b6
init k8s dev setup
cccs-rs Aug 3, 2021
912df90
install socketio deps
cccs-rs Aug 3, 2021
725d451
root context isn't necessary for dev core build
cccs-rs Aug 3, 2021
2327e25
cleanup readme + link to appliance
cccs-rs Aug 3, 2021
8c5b40a
add registry note
cccs-rs Aug 3, 2021
2972183
add changes for dev service-base
cccs-rs Aug 3, 2021
cf19383
reference VS Code usage
cccs-rs Aug 4, 2021
653da6b
default to latest for local builds
cccs-rs Aug 4, 2021
5c1a6ab
Bridge to Kubernetes for debugging + build service
cccs-rs Aug 5, 2021
54a0e66
fix code block
cccs-rs Aug 5, 2021
a621cff
How-to run scaler/updater using k8s bridge
cccs-rs Aug 5, 2021
dd86627
Add cachestore methods for bulk files
cccs-douglass Jul 19, 2021
c4ab3fe
Filestore type annotations
cccs-douglass Jul 19, 2021
a9803f0
Add option for core dependency containers
cccs-douglass Jul 20, 2021
b1d243a
typo
cccs-douglass Jul 21, 2021
0ff6e56
Merge remote-tracking branch 'origin/k8s/dev_setup' into persistent-s…
cccs-douglass Aug 6, 2021
0daf8f7
let updater use a second set of variables
cccs-douglass Aug 9, 2021
2e72401
refactoring module to base
cccs-douglass Aug 13, 2021
8c363d9
changes for building services on dev containers
cccs-douglass Aug 13, 2021
6bdb89e
:Merge remote-tracking branch 'origin/master' into persistent-service…
cccs-douglass Aug 17, 2021
8b34d95
Add type annotations
cccs-douglass Aug 25, 2021
065e836
Changes to dev setup
cccs-douglass Aug 25, 2021
cf728e8
Undo refactor moving service base
cccs-douglass Aug 26, 2021
a70633e
include assemblyline_client for dev
cccs-rs Aug 30, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,4 @@ venv.bak/

# Cython debug symbols
cython_debug/
assemblyline/common/frequency.c
37 changes: 30 additions & 7 deletions assemblyline/cachestore/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@

import re
from typing import AnyStr

from assemblyline.common import forge
from assemblyline.common.isotime import now_as_iso
Expand All @@ -10,9 +11,9 @@


class CacheStore(object):
def __init__(self, component, config=None, datastore=None):
def __init__(self, component: str, config=None, datastore=None):
if not component:
raise ValueError("Cannot instanciate a cachestore without providing a component name.")
raise ValueError("Cannot instantiate a cachestore without providing a component name.")

if not COMPONENT_VALIDATOR.match(component):
raise ValueError("Invalid component name. (Only letters, numbers, underscores and dots allowed)")
Expand All @@ -24,13 +25,13 @@ def __init__(self, component, config=None, datastore=None):
self.datastore = datastore or forge.get_datastore(config=config)
self.filestore = FileStore(*config.filestore.cache)

def __enter__(self):
def __enter__(self) -> 'CacheStore':
return self

def __exit__(self, ex_type, exc_val, exc_tb):
self.filestore.close()

def save(self, cache_key, data, ttl=DEFAULT_CACHE_LEN, force=False):
def save(self, cache_key: str, data: AnyStr, ttl=DEFAULT_CACHE_LEN, force=False):
if not COMPONENT_VALIDATOR.match(cache_key):
raise ValueError("Invalid cache_key for cache item. "
"(Only letters, numbers, underscores and dots allowed)")
Expand All @@ -40,17 +41,39 @@ def save(self, cache_key, data, ttl=DEFAULT_CACHE_LEN, force=False):
self.datastore.cached_file.save(new_key, {'expiry_ts': now_as_iso(ttl), 'component': self.component})
self.filestore.put(new_key, data, force=force)

def get(self, cache_key):
def upload(self, cache_key: str, path: str, ttl=DEFAULT_CACHE_LEN):
if not COMPONENT_VALIDATOR.match(cache_key):
raise ValueError("Invalid cache_key for cache item. "
"(Only letters, numbers, underscores and dots allowed)")

new_key = f"{self.component}_{cache_key}" if self.component else cache_key

self.datastore.cached_file.save(new_key, {'expiry_ts': now_as_iso(ttl), 'component': self.component})
self.filestore.upload(new_key, path, force=True)

def touch(self, cache_key: str, ttl=DEFAULT_CACHE_LEN):
if not COMPONENT_VALIDATOR.match(cache_key):
raise ValueError("Invalid cache_key for cache item. "
"(Only letters, numbers, underscores and dots allowed)")
if not self.exists(cache_key):
raise KeyError(cache_key)

new_key = f"{self.component}_{cache_key}" if self.component else cache_key
self.datastore.cached_file.save(new_key, {'expiry_ts': now_as_iso(ttl), 'component': self.component})

def get(self, cache_key: str) -> bytes:
new_key = f"{self.component}_{cache_key}" if self.component else cache_key
return self.filestore.get(new_key)

def exists(self, cache_key):
def download(self, cache_key: str, path: str):
new_key = f"{self.component}_{cache_key}" if self.component else cache_key
return self.filestore.download(new_key, path)

def exists(self, cache_key: str):
new_key = f"{self.component}_{cache_key}" if self.component else cache_key
return self.filestore.exists(new_key)

def delete(self, cache_key, db_delete=True):
def delete(self, cache_key: str, db_delete=True):
new_key = f"{self.component}_{cache_key}" if self.component else cache_key

self.filestore.delete(new_key)
Expand Down
6 changes: 3 additions & 3 deletions assemblyline/datastore/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -916,17 +916,17 @@ def get_attack_matrix_from_keys(self, keys):
return out

@elasticapm.capture_span(span_type='datastore')
def get_service_with_delta(self, service_name, version=None, as_obj=True):
def get_service_with_delta(self, service_name, version=None, as_obj=True) -> Union[Service, dict, None]:
svc = self.ds.service_delta.get(service_name)
if svc is None:
return svc
return None

if version is not None:
svc.version = version

svc_version_data = self.ds.service.get(f"{service_name}_{svc.version}")
if svc_version_data is None:
return svc_version_data
return None

svc_version_data = recursive_update(svc_version_data.as_primitives(strip_null=True),
svc.as_primitives(strip_null=True))
Expand Down
19 changes: 12 additions & 7 deletions assemblyline/filestore/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from __future__ import annotations

import json
import logging
from typing import AnyStr, TYPE_CHECKING, List, Tuple
from urllib.parse import urlparse, parse_qs, unquote

import elasticapm
Expand All @@ -13,6 +15,9 @@
from assemblyline.filestore.transport.s3 import TransportS3
from assemblyline.filestore.transport.sftp import TransportSFTP

if TYPE_CHECKING:
from assemblyline.filestore.transport.base import Transport


class FileStoreException(Exception):
pass
Expand Down Expand Up @@ -163,7 +168,7 @@ def close(self):
self.log.warning('Transport problem: %s', trace)

@elasticapm.capture_span(span_type='filestore')
def delete(self, path, location='all'):
def delete(self, path: str, location='all'):
with elasticapm.capture_span(name='delete', span_type='filestore', labels={'path': path}):
for t in self.slice(location):
try:
Expand All @@ -173,7 +178,7 @@ def delete(self, path, location='all'):
self.log.info('Transport problem: %s', trace)

@elasticapm.capture_span(span_type='filestore')
def download(self, src_path, dest_path, location='any'):
def download(self, src_path: str, dest_path: str, location='any'):
successful = False
transports = []
download_errors = []
Expand All @@ -191,7 +196,7 @@ def download(self, src_path, dest_path, location='any'):
return transports

@elasticapm.capture_span(span_type='filestore')
def exists(self, path, location='any'):
def exists(self, path, location='any') -> List[Transport]:
transports = []
for t in self.slice(location):
try:
Expand All @@ -205,7 +210,7 @@ def exists(self, path, location='any'):
return transports

@elasticapm.capture_span(span_type='filestore')
def get(self, path, location='any'):
def get(self, path: str, location='any') -> bytes:
for t in self.slice(location):
try:
if t.exists(path):
Expand All @@ -215,7 +220,7 @@ def get(self, path, location='any'):
self.log.warning('Transport problem: %s', trace)

@elasticapm.capture_span(span_type='filestore')
def put(self, dst_path, content, location='all', force=False):
def put(self, dst_path: str, content: AnyStr, location='all', force=False) -> List[Transport]:
transports = []
for t in self.slice(location):
if force or not t.exists(dst_path):
Expand All @@ -239,7 +244,7 @@ def slice(self, location):
return transports

@elasticapm.capture_span(span_type='filestore')
def upload(self, src_path, dst_path, location='all', force=False):
def upload(self, src_path: str, dst_path: str, location='all', force=False) -> List[Transport]:
transports = []
for t in self.slice(location):
if force or not t.exists(dst_path):
Expand All @@ -251,7 +256,7 @@ def upload(self, src_path, dst_path, location='all', force=False):
return transports

@elasticapm.capture_span(span_type='filestore')
def upload_batch(self, local_remote_tuples, location='all'):
def upload_batch(self, local_remote_tuples, location='all') -> List[Tuple[str, str, str]]:
failed_tuples = []
for (src_path, dst_path) in local_remote_tuples:
try:
Expand Down
2 changes: 1 addition & 1 deletion assemblyline/filestore/transport/azure.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ def upload(self, src_path, dst_path):
raise

# Buffer based functions
def get(self, path):
def get(self, path: str) -> bytes:
key = self.normalize(path)
my_blob = BytesIO()

Expand Down
2 changes: 1 addition & 1 deletion assemblyline/filestore/transport/ftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ def upload_batch(self, local_remote_tuples):

# Buffer based functions
@reconnect_retry_on_fail
def get(self, path) -> bytes:
def get(self, path: str) -> bytes:
path = self.normalize(path)
bio = BytesIO()
self.ftp.retrbinary('RETR ' + path, bio.write)
Expand Down
2 changes: 1 addition & 1 deletion assemblyline/filestore/transport/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def upload_batch(self, local_remote_tuples):
raise TransportException("READ ONLY TRANSPORT: Method not implemented")

# Buffer based functions
def get(self, path):
def get(self, path: str) -> bytes:
path = self.normalize(path)
resp = self.session.get(path, auth=self.auth, cert=self.pki, verify=self.verify)
if resp.ok:
Expand Down
2 changes: 1 addition & 1 deletion assemblyline/filestore/transport/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def upload(self, src_path, dst_path):
assert (self.exists(dst_path))

# Buffer based functions
def get(self, path):
def get(self, path: str) -> bytes:
path = self.normalize(path)
fh = None
try:
Expand Down
2 changes: 1 addition & 1 deletion assemblyline/filestore/transport/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ def upload(self, src_path, dst_path):
self.with_retries(self.client.upload_file, src_path, self.bucket, dst_path)

# Buffer based functions
def get(self, path):
def get(self, path: str) -> bytes:
fd, dst_path = tempfile.mkstemp(prefix="s3_transport.", suffix=".download")
os.close(fd) # We don't need the file descriptor open

Expand Down
2 changes: 1 addition & 1 deletion assemblyline/filestore/transport/sftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ def upload_batch(self, local_remote_tuples):

# Buffer based functions
@reconnect_retry_on_fail
def get(self, path):
def get(self, path: str) -> bytes:
path = self.normalize(path)
bio = BytesIO()
with self.sftp.open(path) as sftp_handle:
Expand Down
3 changes: 3 additions & 0 deletions assemblyline/odm/models/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -703,6 +703,8 @@ class Services(odm.Model):
stages: List[str] = odm.List(odm.Keyword())
# Substitution variables for image paths (for custom registry support)
image_variables: Dict[str, str] = odm.Mapping(odm.Keyword(default=''))
# Same as above, but only applied in the updater, used in dev setups and local registries
update_image_variables: Dict[str, str] = odm.Mapping(odm.Keyword(default=''))
# Default update channel to be used for new services
preferred_update_channel: str = odm.Keyword()
# Allow container registries with self signed certs for service updates
Expand All @@ -720,6 +722,7 @@ class Services(odm.Model):
"min_service_workers": 0,
"stages": SERVICE_STAGES,
"image_variables": {},
"update_image_variables": {},
"preferred_update_channel": "stable",
"allow_insecure_registry": False,
"cpu_reservation": 0.25
Expand Down
33 changes: 17 additions & 16 deletions assemblyline/odm/models/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@

@odm.model(index=False, store=False)
class EnvironmentVariable(odm.Model):
name = odm.Keyword()
value = odm.Keyword()
name: str = odm.Keyword()
value: str = odm.Keyword()


@odm.model(index=False, store=False)
Expand Down Expand Up @@ -39,20 +39,21 @@ class PersistentVolume(odm.Model):
class DependencyConfig(odm.Model):
container = odm.Compound(DockerConfig)
volumes = odm.Mapping(odm.Compound(PersistentVolume), default={})
run_as_core: bool = odm.Boolean(default=False)


@odm.model(index=False, store=False)
class UpdateSource(odm.Model):
name = odm.Keyword()
password = odm.Optional(odm.Keyword(default=""))
pattern = odm.Optional(odm.Keyword(default=""))
private_key = odm.Optional(odm.Keyword(default=""))
ca_cert = odm.Optional(odm.Keyword(default=""))
ssl_ignore_errors = odm.Boolean(default=False)
proxy = odm.Optional(odm.Keyword(default=""))
uri = odm.Keyword()
username = odm.Optional(odm.Keyword(default=""))
headers = odm.List(odm.Compound(EnvironmentVariable), default=[])
name: str = odm.Keyword()
password: Opt[str] = odm.Optional(odm.Keyword(default=""))
pattern: Opt[str] = odm.Optional(odm.Keyword(default=""))
private_key: Opt[str] = odm.Optional(odm.Keyword(default=""))
ca_cert: Opt[str] = odm.Optional(odm.Keyword(default=""))
ssl_ignore_errors: bool = odm.Boolean(default=False)
proxy: Opt[str] = odm.Optional(odm.Keyword(default=""))
uri: str = odm.Keyword()
username: Opt[str] = odm.Optional(odm.Keyword(default=""))
headers: list[EnvironmentVariable] = odm.List(odm.Compound(EnvironmentVariable), default=[])
default_classification = odm.Classification(default=Classification.UNRESTRICTED)


Expand All @@ -62,9 +63,9 @@ class UpdateConfig(odm.Model):
generates_signatures = odm.Boolean(index=True, default=False)
method = odm.Enum(values=['run', 'build']) # Are we going to run or build a container?
run_options = odm.Optional(odm.Compound(DockerConfig)) # If we are going to run a container, which one?
sources = odm.List(odm.Compound(UpdateSource), default=[]) # Generic external resources we need
update_interval_seconds = odm.Integer() # Update check interval in seconds
wait_for_update = odm.Boolean(default=False)
sources: list[UpdateSource] = odm.List(odm.Compound(UpdateSource), default=[]) # Generic external resources we need
update_interval_seconds: int = odm.Integer() # Update check interval in seconds
wait_for_update: bool = odm.Boolean(default=False)


@odm.model(index=False, store=False)
Expand All @@ -90,7 +91,7 @@ class Service(odm.Model):
is_external = odm.Boolean(default=False)
licence_count = odm.Integer(default=0)

name = odm.Keyword(store=True, copyto="__text__")
name: str = odm.Keyword(store=True, copyto="__text__")
version = odm.Keyword(store=True)

# Should the result cache be disabled for this service
Expand Down
1 change: 1 addition & 0 deletions assemblyline/odm/models/service_delta.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class PersistentVolumeDelta(odm.Model):
class DependencyConfigDelta(odm.Model):
container = odm.Optional(odm.Compound(DockerConfigDelta))
volumes = odm.Mapping(odm.Compound(PersistentVolumeDelta), default={})
run_as_core: bool = odm.Optional(odm.Boolean())


@odm.model(index=False, store=False)
Expand Down
32 changes: 32 additions & 0 deletions dev/k8s/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Assemblyline Dev Setup (Kubernetes)
- Follow steps in [K8S appliance](https://github.com/CybercentreCanada/assemblyline-helm-chart/tree/master/appliance) for local Kubernetes setup
- Enable registry add-on for microK8S (other registries can be used like Harbor but involves more setup which isn't covered here)
- Test: curl localhost:32000/v2/_catalog
- When ready to build, run local_dev_containers.sh script with tag as parameter.
- Run helm install|upgrade using new tags in values.yaml.
- Use Lens or kubectl to monitor status of deployment
- You can create local service-base images by passing an optional build-arg on a docker build command otherwise will pull latest.
- ie. docker build . -f service-base.Dockerfile --build-arg build_no=dev0
- Debugging: Visual Code's [Bridge to Kubernetes](https://marketplace.visualstudio.com/items?itemName=ms-azuretools.mindaro) &
[Kubernetes](https://marketplace.visualstudio.com/items?itemName=ms-kubernetes-tools.vscode-kubernetes-tools) extensions
- Add the following to settings.json (assuming using microk8s installed from snap):
```
"vs-kubernetes": {
"vs-kubernetes.namespace": "al",
"vs-kubernetes.kubectl-path": "/snap/kubectl/current/kubectl",
"vs-kubernetes.helm-path": "/snap/helm/current/helm",
"vs-kubernetes.minikube-path": "/snap/bin/microk8s",
"vs-kubernetes.kubectlVersioning": "user-provided",
"vs-kubernetes.outputFormat": "yaml",
"vs-kubernetes.kubeconfig": "/var/snap/microk8s/current/credentials/client.config",
"vs-kubernetes.knownKubeconfigs": [],
"vs-kubernetes.autoCleanupOnDebugTerminate": false,
"vs-kubernetes.nodejs-autodetect-remote-root": true,
"vs-kubernetes.nodejs-remote-root": "",
"vs-kubernetes.nodejs-debug-port": 9229,
"vs-kubernetes.local-tunnel-debug-provider": "",
"checkForMinikubeUpgrade": false,
"imageBuildTool": "Docker"
}
```
- Specific to Updater/Scaler: You need to provide an environment variable in your launch targets called 'KUBECONFIG' that points to where your kubeconfig file is.
Loading