Skip to content

Commit 8cc146b

Browse files
Pierre-Etienne BouguéPierre-Etienne Bougue
Pierre-Etienne Bougué
authored and
Pierre-Etienne Bougue
committed
Revert "[Jormungandr] Use transient socket for instance, pt planner and asgard, AND remove socket reaper"
1 parent 5abdae1 commit 8cc146b

File tree

11 files changed

+351
-168
lines changed

11 files changed

+351
-168
lines changed

source/jormungandr/jormungandr/__init__.py

+6
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,13 @@
3232
from __future__ import absolute_import, print_function, unicode_literals, division
3333
import logging
3434
import logging.config
35+
import os
3536
from flask import Flask, got_request_exception
3637
from flask_restful import Api
3738
from flask_caching import Cache
3839
from flask_cors import CORS
40+
import sys
41+
import six
3942
from jormungandr import init
4043

4144
app = Flask(__name__) # type: Flask
@@ -123,4 +126,7 @@ def setup_package():
123126
i_manager.stop()
124127

125128

129+
from jormungandr import transient_socket
130+
131+
transient_socket.TransientSocket.init_socket_reaper(app.config)
126132
i_manager.is_ready = True

source/jormungandr/jormungandr/default_settings.py

+4-9
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,6 @@
1111
# path of the configuration file for each instances
1212
INSTANCES_DIR = os.getenv('JORMUNGANDR_INSTANCES_DIR', '/etc/jormungandr.d')
1313

14-
15-
INSTANCE_TIMEOUT = float(os.getenv('JORMUNGANDR_INSTANCE_TIMEOUT_S', 10))
16-
INSTANCE_FAST_TIMEOUT = float(os.getenv('JORMUNGANDR_INSTANCE_FAST_TIMEOUT_S', 1))
17-
1814
# Patern that matches Jormungandr configuration files
1915
#  ex: '*.json' will match all json files within "INSTANCES_DIR" directory
2016
INSTANCES_FILENAME_PATTERN = os.getenv('JORMUNGANDR_INSTANCES_FILENAME_PATTERN', '*.json')
@@ -223,13 +219,12 @@
223219
MAX_JOURNEYS_CALLS = int(os.getenv('JORMUNGANDR_MAX_JOURNEYS_CALLS', 20))
224220
DEFAULT_AUTOCOMPLETE_BACKEND = os.getenv('JORMUNGANDR_DEFAULT_AUTOCOMPLETE_BACKEND', 'bragi')
225221

226-
227-
# ZMQ
222+
ZMQ_SOCKET_TTL_SECONDS = int(os.getenv('JORMUNGANDR_ZMQ_SOCKET_TTL_SECONDS', 10))
223+
ZMQ_SOCKET_REAPER_INTERVAL = int(os.getenv('JORMUNGANDR_ZMQ_SOCKET_REAPER_INTERVAL', 10))
228224
ZMQ_DEFAULT_SOCKET_TYPE = os.getenv('JORMUNGANDR_ZMQ_DEFAULT_SOCKET_TYPE', 'persistent')
229225

230-
ZMQ_SOCKET_TTL_SECONDS = float(os.getenv('JORMUNGANDR_ZMQ_SOCKET_TTL_SECONDS', 10))
231-
ASGARD_ZMQ_SOCKET_TTL_SECONDS = float(os.getenv('JORMUNGANDR_ASGARD_ZMQ_SOCKET_TTL_SECONDS', 10))
232-
226+
ASGARD_ZMQ_SOCKET_TTL_SECONDS = int(os.getenv('JORMUNGANDR_ASGARD_ZMQ_SOCKET_TTL_SECONDS', 30))
227+
ASGARD_ZMQ_SOCKET_REAPER_INTERVAL = int(os.getenv('JORMUNGANDR_ASGARD_ZMQ_SOCKET_REAPER_INTERVAL', 10))
233228

234229
# Variable used only when deploying on aws
235230
ASGARD_ZMQ_SOCKET = os.getenv('JORMUNGANDR_ASGARD_ZMQ_SOCKET')

source/jormungandr/jormungandr/instance.py

+96-29
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@
6565
from jormungandr.equipments import EquipmentProviderManager
6666
from jormungandr.external_services import ExternalServiceManager
6767
from jormungandr.utils import can_connect_to_database
68-
from jormungandr import pt_planners_manager, transient_socket
68+
from jormungandr import pt_planners_manager
6969

7070
type_to_pttype = {
7171
"stop_area": request_pb2.PlaceCodeRequest.StopArea, # type: ignore
@@ -101,8 +101,9 @@ def _getter(self):
101101
return property(_getter)
102102

103103

104-
class Instance(transient_socket.TransientSocket):
104+
class Instance(object):
105105
name = None # type: Text
106+
_sockets = None # type: Deque[Tuple[zmq.Socket, float]]
106107

107108
def __init__(
108109
self,
@@ -121,15 +122,9 @@ def __init__(
121122
ghost_words=None,
122123
instance_db=None,
123124
):
124-
super(Instance, self).__init__(
125-
name=name,
126-
zmq_context=context,
127-
zmq_socket=zmq_socket,
128-
socket_ttl=app.config.get(str('ZMQ_SOCKET_TTL_SECONDS'), 10),
129-
)
130-
131125
self.geom = None
132126
self.geojson = None
127+
self._sockets = deque()
133128
self.socket_path = zmq_socket
134129
self._scenario = None
135130
self._scenario_name = None
@@ -722,6 +717,66 @@ def places_proximity_radius(self):
722717
instance_db = self.get_models()
723718
return get_value_or_default('places_proximity_radius', instance_db, self.name)
724719

720+
def reap_socket(self, ttl):
721+
# type: (int) -> None
722+
if self.zmq_socket_type != 'transient':
723+
return
724+
logger = logging.getLogger(__name__)
725+
now = time.time()
726+
727+
def _reap_sockets(connector):
728+
while True:
729+
try:
730+
socket, t = connector._sockets.popleft()
731+
if now - t > ttl:
732+
logger.debug("closing one socket for %s", connector.name)
733+
socket.setsockopt(zmq.LINGER, 0)
734+
socket.close()
735+
else:
736+
connector._sockets.appendleft((socket, t))
737+
break # remaining socket are still in "keep alive" state
738+
except IndexError:
739+
break
740+
741+
for _, planner in self._pt_planner_manager.get_all_pt_planners():
742+
if planner.is_zmq_socket():
743+
_reap_sockets(planner)
744+
_reap_sockets(self)
745+
746+
@contextmanager
747+
def socket(self, context):
748+
sockets = self._sockets
749+
socket_path = self.socket_path
750+
751+
t = None
752+
try:
753+
socket, t = sockets.pop()
754+
except IndexError: # there is no socket available: lets create one
755+
start = time.time()
756+
socket = context.socket(zmq.REQ)
757+
socket.connect(socket_path)
758+
logging.getLogger(__name__).info(
759+
"it took %s ms to open a instance socket of %s during a request",
760+
'%.2e' % ((time.time() - start) * 1000),
761+
self.name,
762+
)
763+
764+
try:
765+
yield socket
766+
finally:
767+
if not socket.closed:
768+
if t is not None and time.time() - t > app.config.get("ZMQ_SOCKET_TTL_SECONDS", 10):
769+
start = time.time()
770+
socket.setsockopt(zmq.LINGER, 0)
771+
socket.close()
772+
logging.getLogger(__name__).info(
773+
"it took %s ms to close a instance socket in %s",
774+
'%.2e' % ((time.time() - start) * 1000),
775+
self.name,
776+
)
777+
else:
778+
sockets.append((socket, t or time.time()))
779+
725780
def send_and_receive(self, *args, **kwargs):
726781
"""
727782
encapsulate all call to kraken in a circuit breaker, this way we don't loose time calling dead instance
@@ -731,25 +786,37 @@ def send_and_receive(self, *args, **kwargs):
731786
except pybreaker.CircuitBreakerError as e:
732787
raise DeadSocketException(self.name, self.socket_path)
733788

734-
def _send_and_receive(self, request, timeout=app.config.get('INSTANCE_TIMEOUT', 10), quiet=False, **kwargs):
735-
deadline = datetime.utcnow() + timedelta(milliseconds=timeout * 1000)
789+
def _send_and_receive(
790+
self, request, timeout=app.config.get('INSTANCE_TIMEOUT', 10000), quiet=False, **kwargs
791+
):
792+
logger = logging.getLogger(__name__)
793+
deadline = datetime.utcnow() + timedelta(milliseconds=timeout)
736794
request.deadline = deadline.strftime('%Y%m%dT%H%M%S,%f')
737795

738-
if 'request_id' in kwargs and kwargs['request_id']:
739-
request.request_id = kwargs['request_id']
740-
else:
741-
try:
742-
request.request_id = flask.request.id
743-
except RuntimeError:
744-
# we aren't in a flask context, so there is no request
745-
if 'flask_request_id' in kwargs and kwargs['flask_request_id']:
746-
request.request_id = kwargs['flask_request_id']
747-
748-
pb = self.call(request.SerializeToString(), timeout, debug_cb=lambda: six.text_type(request))
749-
resp = response_pb2.Response()
750-
resp.ParseFromString(pb)
751-
self.update_property(resp)
752-
return resp
796+
with self.socket(self.context) as socket:
797+
if 'request_id' in kwargs and kwargs['request_id']:
798+
request.request_id = kwargs['request_id']
799+
else:
800+
try:
801+
request.request_id = flask.request.id
802+
except RuntimeError:
803+
# we aren't in a flask context, so there is no request
804+
if 'flask_request_id' in kwargs and kwargs['flask_request_id']:
805+
request.request_id = kwargs['flask_request_id']
806+
807+
socket.send(request.SerializeToString())
808+
if socket.poll(timeout=timeout) > 0:
809+
pb = socket.recv()
810+
resp = response_pb2.Response()
811+
resp.ParseFromString(pb)
812+
self.update_property(resp) # we update the timezone and geom of the instances at each request
813+
return resp
814+
else:
815+
socket.setsockopt(zmq.LINGER, 0)
816+
socket.close()
817+
if not quiet:
818+
logger.error('request on %s failed: %s', self.socket_path, six.text_type(request))
819+
raise DeadSocketException(self.name, self.socket_path)
753820

754821
def get_id(self, id_):
755822
"""
@@ -758,7 +825,7 @@ def get_id(self, id_):
758825
req = request_pb2.Request()
759826
req.requested_api = type_pb2.place_uri
760827
req.place_uri.uri = id_
761-
return self.send_and_receive(req, timeout=app.config.get(str('INSTANCE_FAST_TIMEOUT'), 1))
828+
return self.send_and_receive(req, timeout=app.config.get('INSTANCE_FAST_TIMEOUT', 1000))
762829

763830
def has_id(self, id_):
764831
"""
@@ -796,7 +863,7 @@ def get_external_codes(self, type_, id_):
796863
req.place_code.type_code = "external_code"
797864
req.place_code.code = id_
798865
# we set the timeout to 1s
799-
return self.send_and_receive(req, timeout=app.config.get(str('INSTANCE_FAST_TIMEOUT'), 1))
866+
return self.send_and_receive(req, timeout=app.config.get('INSTANCE_FAST_TIMEOUT', 1000))
800867

801868
def has_external_code(self, type_, id_):
802869
"""
@@ -852,7 +919,7 @@ def init(self):
852919
request_id = "instance_init"
853920
try:
854921
# we use _send_and_receive to avoid the circuit breaker, we don't want fast fail on init :)
855-
resp = self._send_and_receive(req, request_id=request_id, timeout=1, quiet=True)
922+
resp = self._send_and_receive(req, request_id=request_id, timeout=1000, quiet=True)
856923
# the instance is automatically updated on a call
857924
if self.publication_date != pub_date:
858925
return True

source/jormungandr/jormungandr/instance_manager.py

+49
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,10 @@ def __init__(
100100
self.start_ping = start_ping
101101
self.instances = {}
102102
self.context = zmq.Context()
103+
self.socket_ttl = app.config.get("ZMQ_SOCKET_TTL_SECONDS", 10)
104+
self.reaper_interval = app.config.get("ZMQ_SOCKET_REAPER_INTERVAL", 10)
103105
self.is_ready = False
106+
self.init_socket_reaper()
104107

105108
def __repr__(self):
106109
return '<InstanceManager>'
@@ -206,6 +209,52 @@ def init_kraken_instances(self):
206209
self._clear_cache()
207210
break
208211

212+
def init_socket_reaper(self):
213+
# start a greenlet that handle connection closing when idle
214+
logging.getLogger(__name__).info("spawning a socket reaper with gevent")
215+
gevent.spawn(self.socket_reaper_thread)
216+
217+
# Use uwsgi timer if we are running in uwsgi without gevent.
218+
# When we are using uwsgi without gevent, idle workers won't run the greenlet, it will only
219+
# be scheduled when waiting for a response of an external service (kraken mostly)
220+
try:
221+
import uwsgi
222+
223+
# In gevent mode we stop, no need to add a timer, the greenlet will be scheduled while waiting
224+
# for incomming request.
225+
if 'gevent' in uwsgi.opt:
226+
return
227+
228+
logging.getLogger(__name__).info("spawning a socket reaper with uwsgi timer")
229+
230+
# Register a signal handler for the signal 1 of uwsgi
231+
# this signal will trigger the socket reaper and can be run by any worker
232+
def reaper_timer(signal):
233+
self.reap_sockets()
234+
235+
uwsgi.register_signal(1, 'active-workers', reaper_timer)
236+
# Add a timer that trigger this signal every reaper_interval second
237+
uwsgi.add_timer(1, self.reaper_interval)
238+
except (ImportError, ValueError):
239+
# ImportError is triggered if we aren't in uwsgi
240+
# ValueError is raise if there is no more timer availlable: only 64 timers can be created
241+
# workers that didn't create a timer can still run the signal handler
242+
# if uwsgi dispatch the signal to them
243+
# signal are dispatched randomly to workers (not round robbin :()
244+
logging.getLogger(__name__).info(
245+
"No more uwsgi timer available or not running in uwsgi, only gevent will be used"
246+
)
247+
248+
def reap_sockets(self):
249+
for instance in self.instances.values():
250+
instance.reap_socket(self.socket_ttl)
251+
gevent.idle(-1) # request handling has the priority
252+
253+
def socket_reaper_thread(self, disable_gevent=False):
254+
while True:
255+
self.reap_sockets()
256+
gevent.sleep(self.reaper_interval)
257+
209258
def thread_ping(self, timer=10):
210259
"""
211260
fetch krakens metadata

0 commit comments

Comments
 (0)