diff --git a/source/jormungandr/jormungandr/default_settings.py b/source/jormungandr/jormungandr/default_settings.py index fbf17713c2..cb8e20fa89 100644 --- a/source/jormungandr/jormungandr/default_settings.py +++ b/source/jormungandr/jormungandr/default_settings.py @@ -137,6 +137,8 @@ ISOCHRONE_DEFAULT_VALUE = os.getenv('JORMUNGANDR_ISOCHRONE_DEFAULT_VALUE', 1800) # in s # circuit breaker parameters. +CIRCUIT_BREAKER_MAX_INSTANCE_FAIL = 4 # max instance call failures before stopping attempt +CIRCUIT_BREAKER_INSTANCE_TIMEOUT_S = 60 # the circuit breaker retries after this timeout (in seconds) CIRCUIT_BREAKER_MAX_TIMEO_FAIL = 4 # max instance call failures before stopping attempt CIRCUIT_BREAKER_TIMEO_TIMEOUT_S = 60 # the circuit breaker retries after this timeout (in seconds) diff --git a/source/jormungandr/jormungandr/instance.py b/source/jormungandr/jormungandr/instance.py index 0524633237..8915f67c82 100644 --- a/source/jormungandr/jormungandr/instance.py +++ b/source/jormungandr/jormungandr/instance.py @@ -54,6 +54,7 @@ from shapely.geos import PredicateError, ReadingError, TopologicalError from flask import g import flask +import pybreaker from jormungandr import georef, schedule, realtime_schedule, ptref, street_network, fallback_modes from jormungandr.scenarios.ridesharing.ridesharing_service_manager import RidesharingServiceManager import six @@ -138,6 +139,10 @@ def __init__( self.timezone = None # timezone will be fetched from the kraken self.publication_date = -1 self.is_initialized = False # kraken hasn't been called yet we don't have geom nor timezone + self.breaker = pybreaker.CircuitBreaker( + fail_max=app.config.get(str('CIRCUIT_BREAKER_MAX_INSTANCE_FAIL'), 5), + reset_timeout=app.config.get(str('CIRCUIT_BREAKER_INSTANCE_TIMEOUT_S'), 60), + ) self.georef = georef.Kraken(self) self._streetnetwork_backend_manager = streetnetwork_backend_manager @@ -717,14 +722,21 @@ def places_proximity_radius(self): instance_db = self.get_models() return get_value_or_default('places_proximity_radius', instance_db, self.name) - def send_and_receive( - self, request, timeout=app.config.get('INSTANCE_TIMEOUT', 10), quiet=False, request_id=None, **kwargs - ): + def send_and_receive(self, *args, **kwargs): + """ + encapsulate all call to kraken in a circuit breaker, this way we don't loose time calling dead instance + """ + try: + return self.breaker.call(self._send_and_receive, *args, **kwargs) + except pybreaker.CircuitBreakerError as e: + raise DeadSocketException(self.name, self.socket_path) + + def _send_and_receive(self, request, timeout=app.config.get('INSTANCE_TIMEOUT', 10), quiet=False, **kwargs): deadline = datetime.utcnow() + timedelta(milliseconds=timeout * 1000) request.deadline = deadline.strftime('%Y%m%dT%H%M%S,%f') - if request_id: - request.request_id = request_id + if 'request_id' in kwargs and kwargs['request_id']: + request.request_id = kwargs['request_id'] else: try: request.request_id = flask.request.id @@ -839,7 +851,8 @@ def init(self): req.requested_api = type_pb2.METADATAS request_id = "instance_init" try: - resp = self.send_and_receive(req, request_id=request_id, timeout=1, quiet=True) + # we use _send_and_receive to avoid the circuit breaker, we don't want fast fail on init :) + resp = self._send_and_receive(req, request_id=request_id, timeout=1, quiet=True) # the instance is automatically updated on a call if self.publication_date != pub_date: return True diff --git a/source/jormungandr/jormungandr/pt_planners/common.py b/source/jormungandr/jormungandr/pt_planners/common.py index 170f7d8756..5fba7aabad 100644 --- a/source/jormungandr/jormungandr/pt_planners/common.py +++ b/source/jormungandr/jormungandr/pt_planners/common.py @@ -29,6 +29,7 @@ from __future__ import absolute_import, print_function, unicode_literals, division import logging +import pybreaker import zmq from datetime import datetime, timedelta @@ -59,13 +60,17 @@ def __init__( name=name, zmq_context=zmq_context, zmq_socket=zmq_socket, socket_ttl=socket_ttl ) self.timeout = timeout + self.breaker = pybreaker.CircuitBreaker( + fail_max=app.config.get(str('CIRCUIT_BREAKER_MAX_INSTANCE_FAIL'), 5), + reset_timeout=app.config.get(str('CIRCUIT_BREAKER_INSTANCE_TIMEOUT_S'), 60), + ) - def send_and_receive(self, request, quiet=False, request_id=None, **kwargs): + def _send_and_receive(self, request, quiet=False, **kwargs): deadline = datetime.utcnow() + timedelta(milliseconds=self.timeout * 1000) request.deadline = deadline.strftime('%Y%m%dT%H%M%S,%f') - if request_id: - request.request_id = request_id + if 'request_id' in kwargs and kwargs['request_id']: + request.request_id = kwargs['request_id'] else: try: request.request_id = flask.request.id @@ -81,6 +86,15 @@ def send_and_receive(self, request, quiet=False, request_id=None, **kwargs): resp.ParseFromString(pb) return resp + def send_and_receive(self, *args, **kwargs): + """ + encapsulate all call to kraken in a circuit breaker, this way we don't lose time calling dead instance + """ + try: + return self.breaker.call(self._send_and_receive, *args, **kwargs) + except pybreaker.CircuitBreakerError: + raise DeadSocketException(self.name, self._zmq_socket) + def clean_up_zmq_sockets(self): for socket in self._sockets: socket.setsockopt(zmq.LINGER, 0) diff --git a/source/jormungandr/tests/integration_tests_settings.py b/source/jormungandr/tests/integration_tests_settings.py index 7900bb7b0b..ea87a2ff67 100644 --- a/source/jormungandr/tests/integration_tests_settings.py +++ b/source/jormungandr/tests/integration_tests_settings.py @@ -65,6 +65,9 @@ } } +# circuit breaker parameters, for the tests by default we don't want the circuit breaker +CIRCUIT_BREAKER_MAX_INSTANCE_FAIL = 99999 +CIRCUIT_BREAKER_INSTANCE_TIMEOUT_S = 1 GRAPHICAL_ISOCHRONE = True HEAT_MAP = True