Skip to content

Commit 1aa4a91

Browse files
authored
Merge pull request #4251 from hove-io/timeout_when_direct_path_is_too_long
[Distributed] Timeout when direct path takes too long
2 parents 2bcdf5a + e9ef965 commit 1aa4a91

File tree

7 files changed

+46
-17
lines changed

7 files changed

+46
-17
lines changed

source/jormungandr/jormungandr/default_settings.py

+2
Original file line numberDiff line numberDiff line change
@@ -355,6 +355,8 @@
355355

356356
GREENLET_POOL_SIZE = int(os.getenv('JORMUNGANDR_GEVENT_POOL_SIZE', 30))
357357

358+
DIRECT_PATH_TIMEOUT = int(os.getenv('JORMUNGANDR_DIRECT_PATH_TIMEOUT', 0.1))
359+
358360
PARSER_MAX_COUNT = int(os.getenv('JORMUNGANDR_PARSER_MAX_COUNT', 1000))
359361

360362
if boolean(os.getenv('JORMUNGANDR_DISABLE_SQLPOOLING', False)):

source/jormungandr/jormungandr/scenarios/distributed.py

+8
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,8 @@ def _compute_journeys(
190190
crowfly_distance = crowfly_distance_between(
191191
get_pt_object_coord(context.requested_orig_obj), get_pt_object_coord(context.requested_dest_obj)
192192
)
193+
194+
direct_path_timeout = app.config.get("DIRECT_PATH_TIMEOUT", 0.1)
193195
context.orig_proximities_by_crowfly = ProximitiesByCrowflyPool(
194196
future_manager=future_manager,
195197
instance=instance,
@@ -200,6 +202,7 @@ def _compute_journeys(
200202
max_nb_crowfly_by_mode=request['max_nb_crowfly_by_mode'],
201203
request_id="{}_crowfly_orig".format(request_id),
202204
o_d_crowfly_distance=crowfly_distance,
205+
direct_path_timeout=direct_path_timeout,
203206
)
204207

205208
context.dest_proximities_by_crowfly = ProximitiesByCrowflyPool(
@@ -212,6 +215,7 @@ def _compute_journeys(
212215
max_nb_crowfly_by_mode=request['max_nb_crowfly_by_mode'],
213216
request_id="{}_crowfly_dest".format(request_id),
214217
o_d_crowfly_distance=crowfly_distance,
218+
direct_path_timeout=direct_path_timeout,
215219
)
216220

217221
context.orig_places_free_access = PlacesFreeAccess(
@@ -240,6 +244,7 @@ def _compute_journeys(
240244
request=request,
241245
direct_path_type=StreetNetworkPathType.BEGINNING_FALLBACK,
242246
request_id="{}_fallback_orig".format(request_id),
247+
direct_path_timeout=direct_path_timeout,
243248
)
244249

245250
context.dest_fallback_durations_pool = FallbackDurationsPool(
@@ -253,6 +258,7 @@ def _compute_journeys(
253258
request=request,
254259
direct_path_type=StreetNetworkPathType.ENDING_FALLBACK,
255260
request_id="{}_fallback_dest".format(request_id),
261+
direct_path_timeout=direct_path_timeout,
256262
)
257263

258264
pt_journey_pool = PtJourneyPool(
@@ -389,6 +395,7 @@ def _compute_isochrone_common(
389395
max_nb_crowfly_by_mode=request.get('max_nb_crowfly_by_mode', {}),
390396
request_id=request_id,
391397
o_d_crowfly_distance=None,
398+
direct_path_timeout=None,
392399
)
393400

394401
places_free_access = PlacesFreeAccess(
@@ -416,6 +423,7 @@ def _compute_isochrone_common(
416423
request=request,
417424
request_id=request_id,
418425
direct_path_type=direct_path_type,
426+
direct_path_timeout=None,
419427
)
420428

421429
# We don't need requested_orig_obj or requested_dest_obj for isochrone

source/jormungandr/jormungandr/scenarios/helper_classes/fallback_durations.py

+4-3
Original file line numberDiff line numberDiff line change
@@ -499,6 +499,7 @@ def __init__(
499499
direct_paths_by_mode,
500500
request,
501501
request_id,
502+
direct_path_timeout,
502503
direct_path_type=StreetNetworkPathType.BEGINNING_FALLBACK,
503504
):
504505
super(FallbackDurationsPool, self).__init__()
@@ -517,7 +518,7 @@ def __init__(
517518
self._request_id = request_id
518519

519520
self._overrided_uri_map = defaultdict(dict)
520-
self._async_request()
521+
self._async_request(direct_path_timeout)
521522

522523
@property
523524
def _overriding_mode_map(self):
@@ -538,10 +539,10 @@ def _overriding_mode_map(self):
538539
res[mode] = overriding_modes
539540
return res
540541

541-
def _async_request(self):
542+
def _async_request(self, direct_path_timeout):
542543
for mode in self._modes:
543544
max_fallback_duration = get_max_fallback_duration(
544-
self._request, mode, self._direct_paths_by_mode.get(mode)
545+
self._request, mode, self._direct_paths_by_mode.get(mode), direct_path_timeout
545546
)
546547
fallback_durations = FallbackDurations(
547548
self._future_manager,

source/jormungandr/jormungandr/scenarios/helper_classes/helper_utils.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -683,7 +683,7 @@ def _build_fallback(
683683
return pt_journey
684684

685685

686-
def get_max_fallback_duration(request, mode, dp_future):
686+
def get_max_fallback_duration(request, mode, dp_future, direct_path_timeout):
687687
"""
688688
By knowing the duration of direct path, we can limit the max duration for proximities by crowfly and fallback
689689
durations
@@ -694,7 +694,7 @@ def get_max_fallback_duration(request, mode, dp_future):
694694
"""
695695
# 30 minutes by default
696696
max_duration = request.get('max_{}_duration_to_pt'.format(mode), 1800)
697-
dp = dp_future.wait_and_get() if dp_future else None
697+
dp = dp_future.wait_and_get(timeout=direct_path_timeout) if dp_future else None
698698
dp_duration = dp.journeys[0].durations.total if getattr(dp, 'journeys', None) else max_duration
699699
return min(max_duration, dp_duration)
700700

source/jormungandr/jormungandr/scenarios/helper_classes/proximities_by_crowfly.py

+9-4
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@ def __init__(
144144
max_nb_crowfly_by_mode,
145145
request_id,
146146
o_d_crowfly_distance,
147+
direct_path_timeout,
147148
):
148149
"""
149150
A ProximitiesByCrowflyPool is a set of ProximitiesByCrowfly grouped by mode
@@ -175,9 +176,9 @@ def __init__(
175176
self._value = {}
176177
self._request_id = request_id
177178
self._o_d_crowfly_distance = o_d_crowfly_distance
178-
self._async_request()
179+
self._async_request(direct_path_timeout)
179180

180-
def _async_request(self):
181+
def _async_request(self, direct_path_timeout):
181182

182183
for mode in self._modes:
183184
object_type = type_pb2.STOP_POINT
@@ -188,11 +189,15 @@ def _async_request(self):
188189
filter = "poi_type.uri=\"poi_type:amenity:parking\""
189190

190191
dp_future = self._direct_paths_by_mode.get(mode)
191-
max_fallback_duration = get_max_fallback_duration(self._request, mode, dp_future)
192+
max_fallback_duration = get_max_fallback_duration(
193+
self._request, mode, dp_future, direct_path_timeout
194+
)
192195
speed = jormungandr.street_network.utils.make_speed_switcher(self._request).get(mode)
193196

194197
no_dp = (
195-
dp_future is None or dp_future.wait_and_get() is None or not dp_future.wait_and_get().journeys
198+
dp_future is None
199+
or dp_future.wait_and_get(timeout=direct_path_timeout) is None
200+
or not dp_future.wait_and_get(timeout=direct_path_timeout).journeys
196201
)
197202

198203
if mode == fm.FallbackModes.car.name and no_dp and self._o_d_crowfly_distance is not None:

source/jormungandr/jormungandr/scenarios/helper_classes/streetnetwork_path.py

+20-7
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
from jormungandr import utils, new_relic
3131
from jormungandr.street_network.street_network import StreetNetworkPathType
3232
import logging
33+
import gevent
3334
from .helper_utils import (
3435
timed_logger,
3536
prepend_first_coord,
@@ -223,15 +224,27 @@ def _async_request(self):
223224
for destination in self.get_pt_objects(self._dest_obj):
224225
self._futures.append(self._future_manager.create_future(self._do_request, origin, destination))
225226

226-
def wait_and_get(self):
227+
def wait_and_get(self, timeout=None):
227228

228-
best_res = min(
229-
(future.wait_and_get() for future in self._futures if future.wait_and_get().response is not None),
230-
key=lambda r: r.response.journeys[0].duration,
231-
default=None,
232-
)
229+
# timeout=None -> wait forever...
230+
timer = gevent.timeout.Timeout(timeout, exception=False)
231+
232+
best_res = None
233+
with timer:
234+
best_res = min(
235+
(
236+
future.wait_and_get()
237+
for future in self._futures
238+
if future.wait_and_get().response is not None
239+
),
240+
key=lambda r: r.response.journeys[0].duration,
241+
default=None,
242+
)
243+
244+
# if best_res is still None, that means timeout is triggered
233245
if best_res is None:
234-
return response_pb2.Response()
246+
self._logger.debug("time out in StreetNetworkPath")
247+
return None
235248

236249
if self._best_dp is None:
237250
dp = self.finalize_direct_path(best_res)

source/jormungandr/jormungandr/scenarios/new_default.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -1042,7 +1042,7 @@ def merge_responses(responses, debug):
10421042

10431043
if not merged_response.journeys:
10441044
# we aggregate the errors found
1045-
errors = {r.error.id: r.error for r in responses if r.HasField(str('error'))}
1045+
errors = {r.error.id: r.error for r in responses if r and r.HasField(str('error'))}
10461046

10471047
# Only one errors field
10481048
if len(errors) == 1:

0 commit comments

Comments
 (0)