Skip to content

Commit 0a22409

Browse files
authored
Merge pull request #3919 from hove-io/revert-3917-remove_instance_breaker
Revert "remove instance circuit breaker"
2 parents 762cf9d + 63fa537 commit 0a22409

File tree

4 files changed

+41
-9
lines changed

4 files changed

+41
-9
lines changed

source/jormungandr/jormungandr/default_settings.py

+2
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,8 @@
137137
ISOCHRONE_DEFAULT_VALUE = os.getenv('JORMUNGANDR_ISOCHRONE_DEFAULT_VALUE', 1800) # in s
138138

139139
# circuit breaker parameters.
140+
CIRCUIT_BREAKER_MAX_INSTANCE_FAIL = 4 # max instance call failures before stopping attempt
141+
CIRCUIT_BREAKER_INSTANCE_TIMEOUT_S = 60 # the circuit breaker retries after this timeout (in seconds)
140142

141143
CIRCUIT_BREAKER_MAX_TIMEO_FAIL = 4 # max instance call failures before stopping attempt
142144
CIRCUIT_BREAKER_TIMEO_TIMEOUT_S = 60 # the circuit breaker retries after this timeout (in seconds)

source/jormungandr/jormungandr/instance.py

+19-6
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
from shapely.geos import PredicateError, ReadingError, TopologicalError
5555
from flask import g
5656
import flask
57+
import pybreaker
5758
from jormungandr import georef, schedule, realtime_schedule, ptref, street_network, fallback_modes
5859
from jormungandr.scenarios.ridesharing.ridesharing_service_manager import RidesharingServiceManager
5960
import six
@@ -138,6 +139,10 @@ def __init__(
138139
self.timezone = None # timezone will be fetched from the kraken
139140
self.publication_date = -1
140141
self.is_initialized = False # kraken hasn't been called yet we don't have geom nor timezone
142+
self.breaker = pybreaker.CircuitBreaker(
143+
fail_max=app.config.get(str('CIRCUIT_BREAKER_MAX_INSTANCE_FAIL'), 5),
144+
reset_timeout=app.config.get(str('CIRCUIT_BREAKER_INSTANCE_TIMEOUT_S'), 60),
145+
)
141146
self.georef = georef.Kraken(self)
142147
self._streetnetwork_backend_manager = streetnetwork_backend_manager
143148

@@ -717,14 +722,21 @@ def places_proximity_radius(self):
717722
instance_db = self.get_models()
718723
return get_value_or_default('places_proximity_radius', instance_db, self.name)
719724

720-
def send_and_receive(
721-
self, request, timeout=app.config.get('INSTANCE_TIMEOUT', 10), quiet=False, request_id=None, **kwargs
722-
):
725+
def send_and_receive(self, *args, **kwargs):
726+
"""
727+
encapsulate all call to kraken in a circuit breaker, this way we don't loose time calling dead instance
728+
"""
729+
try:
730+
return self.breaker.call(self._send_and_receive, *args, **kwargs)
731+
except pybreaker.CircuitBreakerError as e:
732+
raise DeadSocketException(self.name, self.socket_path)
733+
734+
def _send_and_receive(self, request, timeout=app.config.get('INSTANCE_TIMEOUT', 10), quiet=False, **kwargs):
723735
deadline = datetime.utcnow() + timedelta(milliseconds=timeout * 1000)
724736
request.deadline = deadline.strftime('%Y%m%dT%H%M%S,%f')
725737

726-
if request_id:
727-
request.request_id = request_id
738+
if 'request_id' in kwargs and kwargs['request_id']:
739+
request.request_id = kwargs['request_id']
728740
else:
729741
try:
730742
request.request_id = flask.request.id
@@ -839,7 +851,8 @@ def init(self):
839851
req.requested_api = type_pb2.METADATAS
840852
request_id = "instance_init"
841853
try:
842-
resp = self.send_and_receive(req, request_id=request_id, timeout=1, quiet=True)
854+
# we use _send_and_receive to avoid the circuit breaker, we don't want fast fail on init :)
855+
resp = self._send_and_receive(req, request_id=request_id, timeout=1, quiet=True)
843856
# the instance is automatically updated on a call
844857
if self.publication_date != pub_date:
845858
return True

source/jormungandr/jormungandr/pt_planners/common.py

+17-3
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
from __future__ import absolute_import, print_function, unicode_literals, division
3030

3131
import logging
32+
import pybreaker
3233
import zmq
3334

3435
from datetime import datetime, timedelta
@@ -59,13 +60,17 @@ def __init__(
5960
name=name, zmq_context=zmq_context, zmq_socket=zmq_socket, socket_ttl=socket_ttl
6061
)
6162
self.timeout = timeout
63+
self.breaker = pybreaker.CircuitBreaker(
64+
fail_max=app.config.get(str('CIRCUIT_BREAKER_MAX_INSTANCE_FAIL'), 5),
65+
reset_timeout=app.config.get(str('CIRCUIT_BREAKER_INSTANCE_TIMEOUT_S'), 60),
66+
)
6267

63-
def send_and_receive(self, request, quiet=False, request_id=None, **kwargs):
68+
def _send_and_receive(self, request, quiet=False, **kwargs):
6469
deadline = datetime.utcnow() + timedelta(milliseconds=self.timeout * 1000)
6570
request.deadline = deadline.strftime('%Y%m%dT%H%M%S,%f')
6671

67-
if request_id:
68-
request.request_id = request_id
72+
if 'request_id' in kwargs and kwargs['request_id']:
73+
request.request_id = kwargs['request_id']
6974
else:
7075
try:
7176
request.request_id = flask.request.id
@@ -81,6 +86,15 @@ def send_and_receive(self, request, quiet=False, request_id=None, **kwargs):
8186
resp.ParseFromString(pb)
8287
return resp
8388

89+
def send_and_receive(self, *args, **kwargs):
90+
"""
91+
encapsulate all call to kraken in a circuit breaker, this way we don't lose time calling dead instance
92+
"""
93+
try:
94+
return self.breaker.call(self._send_and_receive, *args, **kwargs)
95+
except pybreaker.CircuitBreakerError:
96+
raise DeadSocketException(self.name, self._zmq_socket)
97+
8498
def clean_up_zmq_sockets(self):
8599
for socket in self._sockets:
86100
socket.setsockopt(zmq.LINGER, 0)

source/jormungandr/tests/integration_tests_settings.py

+3
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,9 @@
6565
}
6666
}
6767

68+
# circuit breaker parameters, for the tests by default we don't want the circuit breaker
69+
CIRCUIT_BREAKER_MAX_INSTANCE_FAIL = 99999
70+
CIRCUIT_BREAKER_INSTANCE_TIMEOUT_S = 1
6871

6972
GRAPHICAL_ISOCHRONE = True
7073
HEAT_MAP = True

0 commit comments

Comments
 (0)