Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Distributed] Timeout when direct path takes too long #4251

Merged
merged 4 commits into from
Apr 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions source/jormungandr/jormungandr/default_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,8 @@

GREENLET_POOL_SIZE = int(os.getenv('JORMUNGANDR_GEVENT_POOL_SIZE', 30))

DIRECT_PATH_TIMEOUT = int(os.getenv('JORMUNGANDR_DIRECT_PATH_TIMEOUT', 0.1))

PARSER_MAX_COUNT = int(os.getenv('JORMUNGANDR_PARSER_MAX_COUNT', 1000))

if boolean(os.getenv('JORMUNGANDR_DISABLE_SQLPOOLING', False)):
Expand Down
8 changes: 8 additions & 0 deletions source/jormungandr/jormungandr/scenarios/distributed.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,8 @@ def _compute_journeys(
crowfly_distance = crowfly_distance_between(
get_pt_object_coord(context.requested_orig_obj), get_pt_object_coord(context.requested_dest_obj)
)

direct_path_timeout = app.config.get("DIRECT_PATH_TIMEOUT", 0.1)
context.orig_proximities_by_crowfly = ProximitiesByCrowflyPool(
future_manager=future_manager,
instance=instance,
Expand All @@ -200,6 +202,7 @@ def _compute_journeys(
max_nb_crowfly_by_mode=request['max_nb_crowfly_by_mode'],
request_id="{}_crowfly_orig".format(request_id),
o_d_crowfly_distance=crowfly_distance,
direct_path_timeout=direct_path_timeout,
)

context.dest_proximities_by_crowfly = ProximitiesByCrowflyPool(
Expand All @@ -212,6 +215,7 @@ def _compute_journeys(
max_nb_crowfly_by_mode=request['max_nb_crowfly_by_mode'],
request_id="{}_crowfly_dest".format(request_id),
o_d_crowfly_distance=crowfly_distance,
direct_path_timeout=direct_path_timeout,
)

context.orig_places_free_access = PlacesFreeAccess(
Expand Down Expand Up @@ -240,6 +244,7 @@ def _compute_journeys(
request=request,
direct_path_type=StreetNetworkPathType.BEGINNING_FALLBACK,
request_id="{}_fallback_orig".format(request_id),
direct_path_timeout=direct_path_timeout,
)

context.dest_fallback_durations_pool = FallbackDurationsPool(
Expand All @@ -253,6 +258,7 @@ def _compute_journeys(
request=request,
direct_path_type=StreetNetworkPathType.ENDING_FALLBACK,
request_id="{}_fallback_dest".format(request_id),
direct_path_timeout=direct_path_timeout,
)

pt_journey_pool = PtJourneyPool(
Expand Down Expand Up @@ -389,6 +395,7 @@ def _compute_isochrone_common(
max_nb_crowfly_by_mode=request.get('max_nb_crowfly_by_mode', {}),
request_id=request_id,
o_d_crowfly_distance=None,
direct_path_timeout=None,
)

places_free_access = PlacesFreeAccess(
Expand Down Expand Up @@ -416,6 +423,7 @@ def _compute_isochrone_common(
request=request,
request_id=request_id,
direct_path_type=direct_path_type,
direct_path_timeout=None,
)

# We don't need requested_orig_obj or requested_dest_obj for isochrone
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,7 @@ def __init__(
direct_paths_by_mode,
request,
request_id,
direct_path_timeout,
direct_path_type=StreetNetworkPathType.BEGINNING_FALLBACK,
):
super(FallbackDurationsPool, self).__init__()
Expand All @@ -517,7 +518,7 @@ def __init__(
self._request_id = request_id

self._overrided_uri_map = defaultdict(dict)
self._async_request()
self._async_request(direct_path_timeout)

@property
def _overriding_mode_map(self):
Expand All @@ -538,10 +539,10 @@ def _overriding_mode_map(self):
res[mode] = overriding_modes
return res

def _async_request(self):
def _async_request(self, direct_path_timeout):
for mode in self._modes:
max_fallback_duration = get_max_fallback_duration(
self._request, mode, self._direct_paths_by_mode.get(mode)
self._request, mode, self._direct_paths_by_mode.get(mode), direct_path_timeout
)
fallback_durations = FallbackDurations(
self._future_manager,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -683,7 +683,7 @@ def _build_fallback(
return pt_journey


def get_max_fallback_duration(request, mode, dp_future):
def get_max_fallback_duration(request, mode, dp_future, direct_path_timeout):
"""
By knowing the duration of direct path, we can limit the max duration for proximities by crowfly and fallback
durations
Expand All @@ -694,7 +694,7 @@ def get_max_fallback_duration(request, mode, dp_future):
"""
# 30 minutes by default
max_duration = request.get('max_{}_duration_to_pt'.format(mode), 1800)
dp = dp_future.wait_and_get() if dp_future else None
dp = dp_future.wait_and_get(timeout=direct_path_timeout) if dp_future else None
dp_duration = dp.journeys[0].durations.total if getattr(dp, 'journeys', None) else max_duration
return min(max_duration, dp_duration)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ def __init__(
max_nb_crowfly_by_mode,
request_id,
o_d_crowfly_distance,
direct_path_timeout,
):
"""
A ProximitiesByCrowflyPool is a set of ProximitiesByCrowfly grouped by mode
Expand Down Expand Up @@ -175,9 +176,9 @@ def __init__(
self._value = {}
self._request_id = request_id
self._o_d_crowfly_distance = o_d_crowfly_distance
self._async_request()
self._async_request(direct_path_timeout)

def _async_request(self):
def _async_request(self, direct_path_timeout):

for mode in self._modes:
object_type = type_pb2.STOP_POINT
Expand All @@ -188,11 +189,15 @@ def _async_request(self):
filter = "poi_type.uri=\"poi_type:amenity:parking\""

dp_future = self._direct_paths_by_mode.get(mode)
max_fallback_duration = get_max_fallback_duration(self._request, mode, dp_future)
max_fallback_duration = get_max_fallback_duration(
self._request, mode, dp_future, direct_path_timeout
)
speed = jormungandr.street_network.utils.make_speed_switcher(self._request).get(mode)

no_dp = (
dp_future is None or dp_future.wait_and_get() is None or not dp_future.wait_and_get().journeys
dp_future is None
or dp_future.wait_and_get(timeout=direct_path_timeout) is None
or not dp_future.wait_and_get(timeout=direct_path_timeout).journeys
)

if mode == fm.FallbackModes.car.name and no_dp and self._o_d_crowfly_distance is not None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from jormungandr import utils, new_relic
from jormungandr.street_network.street_network import StreetNetworkPathType
import logging
import gevent
from .helper_utils import (
timed_logger,
prepend_first_coord,
Expand Down Expand Up @@ -223,15 +224,27 @@ def _async_request(self):
for destination in self.get_pt_objects(self._dest_obj):
self._futures.append(self._future_manager.create_future(self._do_request, origin, destination))

def wait_and_get(self):
def wait_and_get(self, timeout=None):

best_res = min(
(future.wait_and_get() for future in self._futures if future.wait_and_get().response is not None),
key=lambda r: r.response.journeys[0].duration,
default=None,
)
# timeout=None -> wait forever...
timer = gevent.timeout.Timeout(timeout, exception=False)

best_res = None
with timer:
best_res = min(
(
future.wait_and_get()
for future in self._futures
if future.wait_and_get().response is not None
),
key=lambda r: r.response.journeys[0].duration,
default=None,
)

# if best_res is still None, that means timeout is triggered
if best_res is None:
return response_pb2.Response()
self._logger.debug("time out in StreetNetworkPath")
return None

if self._best_dp is None:
dp = self.finalize_direct_path(best_res)
Expand Down
2 changes: 1 addition & 1 deletion source/jormungandr/jormungandr/scenarios/new_default.py
Original file line number Diff line number Diff line change
Expand Up @@ -1042,7 +1042,7 @@ def merge_responses(responses, debug):

if not merged_response.journeys:
# we aggregate the errors found
errors = {r.error.id: r.error for r in responses if r.HasField(str('error'))}
errors = {r.error.id: r.error for r in responses if r and r.HasField(str('error'))}

# Only one errors field
if len(errors) == 1:
Expand Down
Loading