Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Surface some container events back in UI #820

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion assemblyline_core/scaler/controllers/interface.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations
from typing import TYPE_CHECKING, Optional


if TYPE_CHECKING:
from assemblyline_core.scaler.scaler_server import ServiceProfile

Expand All @@ -11,6 +12,14 @@ def __init__(self, message, service_name):
self.service_name = service_name


class ContainerEvent:
def __init__(self, object_name: str, message: str, service_name=None, updater=None) -> None:
self.object_name = object_name
self.message = message
self.service_name = service_name
self.updater = updater


class ControllerInterface:
def add_profile(self, profile, scale=0):
"""Tell the controller about a service profile it needs to manage."""
Expand Down Expand Up @@ -50,7 +59,7 @@ def restart(self, service: ServiceProfile):
def get_running_container_names(self):
raise NotImplementedError()

def new_events(self):
def new_events(self) -> list[ContainerEvent]:
return []

def stateful_container_key(self, service_name: str, container_name: str, spec, change_key: str) -> Optional[str]:
Expand Down
62 changes: 57 additions & 5 deletions assemblyline_core/scaler/controllers/kubernetes_ctl.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@
V1PersistentVolumeClaimSpec, V1NetworkPolicy, V1NetworkPolicySpec, V1NetworkPolicyEgressRule, V1NetworkPolicyPeer, \
V1NetworkPolicyIngressRule, V1Secret, V1SecretVolumeSource, V1LocalObjectReference, V1Service, \
V1ServiceSpec, V1ServicePort, V1PodSecurityContext, V1Probe, V1ExecAction, V1SecurityContext, \
V1Affinity, V1NodeAffinity, V1NodeSelector, V1NodeSelectorTerm, V1NodeSelectorRequirement
V1Affinity, V1NodeAffinity, V1NodeSelector, V1NodeSelectorTerm, V1NodeSelectorRequirement, CoreV1Event
from kubernetes.client.rest import ApiException
from assemblyline.odm.models.service import DependencyConfig, DockerConfig, PersistentVolume

from assemblyline_core.scaler.controllers.interface import ControllerInterface
from assemblyline_core.scaler.controllers.interface import ContainerEvent, ControllerInterface

# RESERVE_MEMORY_PER_NODE = os.environ.get('RESERVE_MEMORY_PER_NODE')

Expand Down Expand Up @@ -254,7 +254,7 @@ def __init__(self, logger, namespace: str, prefix: str, priority: str, dependenc
config.load_kube_config(client_configuration=cfg)

self.running: bool = True
self.prefix: str = prefix.lower()
self.prefix: str = prefix.lower().replace('_', '-')
self.priority: str = priority
self.dependency_priority: str = dependency_priority
self.cpu_reservation: float = max(0.0, min(cpu_reservation, 1.0))
Expand Down Expand Up @@ -933,7 +933,51 @@ def get_running_container_names(self):
_request_timeout=API_TIMEOUT)
return [pod.metadata.name for pod in pods.items]

def new_events(self):
@staticmethod
def dropped_message(event: CoreV1Event) -> bool:
"""These aren't messages anyone looking at the """

# These are common ephemeral states
if event.action in ["Scheduling"]:
return True
if event.reason in ["BackOff", "NodeNotReady", "FailedGetResourceMetric", "FailedComputeMetricsReplicas"]:
return True

# There is a more detailed event that starts with "Failed to pull image"
if event.message == "Error: ErrImagePull":
return True
if event.message == "Error: ImagePullBackOff":
return True

# Probes failing on exiting containers is normal
if "Readiness probe errored" in event.message and "container is in CONTAINER_EXITED state" in event.message:
return True
if "Liveness probe errored" in event.message and "container is in CONTAINER_EXITED state" in event.message:
return True

return False

def parse_container_name(self, name) -> tuple[Optional[str], Optional[bool]]:
if not name.startswith(self.prefix):
return (None, None)
name = name.removeprefix(self.prefix)

name, _, container_id = name.rpartition('-')
if not container_id:
return (None, None)

name, _, deployment_id = name.rpartition('-')
if not deployment_id:
return (None, None)

updater = False
if name.endswith('-updates'):
name = name.removesuffix('-updates')
updater = True

return (name, updater)

def new_events(self) -> list[ContainerEvent]:
response = self.api.list_namespaced_event(namespace=self.namespace, pretty='false',
field_selector='type=Warning', watch=False,
_request_timeout=API_TIMEOUT)
Expand All @@ -943,7 +987,15 @@ def new_events(self):
for event in response.items:
if self.events_window.get(event.metadata.uid, 0) != event.count:
self.events_window[event.metadata.uid] = event.count
new.append(event.involved_object.name + ': ' + event.message)

if KubernetesController.dropped_message(event):
continue

object_name = event.involved_object.name
service_name, is_updater = None, None
if event.involved_object.kind == 'Pod':
service_name, is_updater = self.parse_container_name(object_name)
new.append(ContainerEvent(object_name, event.message, service_name, is_updater))

# Flush out events that have moved outside the window
old = set(self.events_window.keys()) - {event.metadata.uid for event in response.items}
Expand Down
41 changes: 36 additions & 5 deletions assemblyline_core/scaler/scaler_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,18 @@
from assemblyline.remote.datatypes.events import EventWatcher, EventSender
from assemblyline.odm.models.service import Service, DockerConfig, EnvironmentVariable
from assemblyline.odm.models.config import Mount
from assemblyline.odm.models.error import Error
from assemblyline.odm.messages.scaler_heartbeat import Metrics
from assemblyline.odm.messages.scaler_status_heartbeat import Status
from assemblyline.odm.messages.changes import ServiceChange, Operation
from assemblyline.common.dict_utils import get_recursive_sorted_tuples
from assemblyline.common.uid import get_id_from_data
from assemblyline.common.uid import get_id_from_data, get_random_id
from assemblyline.common.forge import get_classification, get_service_queue, get_apm_client
from assemblyline.common.constants import SCALER_TIMEOUT_QUEUE, SERVICE_STATE_HASH, ServiceStatus
from assemblyline.common.version import FRAMEWORK_VERSION, SYSTEM_VERSION
from assemblyline.common.isotime import now_as_iso
from assemblyline_core.scaler.controllers import KubernetesController
from assemblyline_core.scaler.controllers.interface import ServiceControlError
from assemblyline_core.scaler.controllers.interface import ContainerEvent, ServiceControlError
from assemblyline_core.server_base import ServiceStage, ThreadedCoreBase

from .controllers import DockerController
Expand All @@ -50,6 +52,7 @@
METRIC_SYNC_INTERVAL = 0.5
CONTAINER_EVENTS_LOG_INTERVAL = 2
HEARTBEAT_INTERVAL = 5
SIXTY_DAYS = 60 * 60 * 24 * 60

# The maximum containers we ask to be created in a single scaling iteration
# This is to give kubernetes a chance to update our view of resource usage before we ask for more containers
Expand All @@ -70,6 +73,7 @@
SERVICE_API_HOST = os.getenv('SERVICE_API_HOST', None)
UI_SERVER = os.getenv('UI_SERVER', None)
INTERNAL_ENCRYPT = bool(SERVICE_API_HOST and SERVICE_API_HOST.startswith('https'))
SERVICE_PREFIX = 'alsvc-'


@contextmanager
Expand Down Expand Up @@ -297,7 +301,7 @@ def __init__(self, config=None, datastore=None, redis=None, redis_persist=None):

if KUBERNETES_AL_CONFIG:
self.log.info(f"Loading Kubernetes cluster interface on namespace: {NAMESPACE}")
self.controller = KubernetesController(logger=self.log, prefix='alsvc_', labels=labels,
self.controller = KubernetesController(logger=self.log, prefix=SERVICE_PREFIX, labels=labels,
namespace=NAMESPACE, priority='al-service-priority',
dependency_priority='al-core-priority',
cpu_reservation=self.config.services.cpu_reservation,
Expand Down Expand Up @@ -941,7 +945,34 @@ def export_metrics(self):

def log_container_events(self):
"""The service status table may have references to containers that have crashed. Try to remove them all."""
pool = concurrent.futures.ThreadPoolExecutor(20)
while self.sleep(CONTAINER_EVENTS_LOG_INTERVAL):
with apm_span(self.apm_client, 'log_container_events'):
for message in self.controller.new_events():
self.log.warning("Container Event :: " + message)
for event in self.controller.new_events():
if event.service_name:
pool.submit(self._process_service_event, event)
continue
self.log.warning("Container Event :: " + event.object_name + ": " + event.message)

def _process_service_event(self, event: ContainerEvent):
"""Record the container event in the service event table."""
try:
message = f"Event for service {'updater' if event.updater else ''} container " + \
f"[{event.object_name}] for service: \n" + event.message

error = Error({
'expiry_ts': now_as_iso(SIXTY_DAYS),
'response': {
'message': message,
'service_name': event.service_name,
'service_version': 'UNKNOWN',
'status': 'FAIL_NONRECOVERABLE'
},
'sha256': '0' * 64,
'type': 'UNKNOWN'
})
error_key = error.build_key(service_tool_version=get_random_id())
self.datastore.error.save(error_key, error)

except Exception:
self.log.exception("Error reporting service container event")