Skip to content

Commit dd2069f

Browse files
authored
Merge pull request #4213 from hove-io/transfer_path_cached
Adding cache for transfer_path
2 parents 69282c6 + 10d9226 commit dd2069f

File tree

7 files changed

+130
-35
lines changed

7 files changed

+130
-35
lines changed

source/jormungandr/jormungandr/default_settings.py

+3
Original file line numberDiff line numberDiff line change
@@ -155,8 +155,11 @@
155155
'TIMEOUT_SYNTHESE': 30,
156156
'TIMEOUT_KRAKEN_COVERAGES': 60,
157157
'FETCH_S3_DATA_TIMEOUT': 24 * 60,
158+
# TIMEOUT_TRANSFER_PATH = 24Hours
159+
"TIMEOUT_TRANSFER_PATH": 24 * 60 * 60,
158160
}
159161

162+
160163
CACHE_CONFIGURATION = json.loads(os.getenv('JORMUNGANDR_CACHE_CONFIGURATION', '{}')) or default_cache
161164

162165

source/jormungandr/jormungandr/pt_planners/common.py

+12
Original file line numberDiff line numberDiff line change
@@ -165,3 +165,15 @@ def get_odt_stop_points(pt_planner, coord, request_id):
165165
req.coord.lon = coord.lon
166166
req.coord.lat = coord.lat
167167
return pt_planner.send_and_receive(req, request_id=request_id).stop_points
168+
169+
170+
def get_stop_points_from_uri(pt_planner, uri, request_id, depth=0):
171+
req = request_pb2.Request()
172+
req.requested_api = type_pb2.PTREFERENTIAL
173+
req.ptref.requested_type = type_pb2.STOP_POINT
174+
req.ptref.count = 100
175+
req.ptref.start_page = 0
176+
req.ptref.depth = depth
177+
req.ptref.filter = 'stop_point.uri = {uri}'.format(uri=uri)
178+
result = pt_planner.send_and_receive(req, request_id=request_id)
179+
return result.stop_points

source/jormungandr/jormungandr/pt_planners/kraken.py

+12-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
# www.navitia.io
2929
from __future__ import absolute_import, print_function, unicode_literals, division
3030

31-
from jormungandr.pt_planners.common import ZmqSocket, get_crow_fly, get_odt_stop_points
31+
from jormungandr.pt_planners.common import ZmqSocket, get_crow_fly, get_odt_stop_points, get_stop_points_from_uri
3232
from jormungandr import utils, app
3333
from .pt_planner import AbstractPtPlanner
3434
from navitiacommon import type_pb2
@@ -61,6 +61,17 @@ def graphical_isochrones(
6161
)
6262
return self.send_and_receive(req)
6363

64+
def get_access_points(self, pt_object, access_point_filter, request_id):
65+
stop_points = get_stop_points_from_uri(self, pt_object.uri, request_id, depth=2)
66+
if not stop_points:
67+
return None
68+
69+
return [
70+
type_pb2.PtObject(name=ap.name, uri=ap.uri, embedded_type=type_pb2.ACCESS_POINT, access_point=ap)
71+
for ap in stop_points[0].access_points
72+
if access_point_filter(ap)
73+
]
74+
6475
def get_crow_fly(
6576
self,
6677
origin,

source/jormungandr/jormungandr/pt_planners/loki.py

+7
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,13 @@ def graphical_isochrones(
6262
):
6363
raise NotImplementedError("Too bad, you cannot ask loki for graphical isochrones :)")
6464

65+
def get_access_points(self, pt_object, access_point_filter, request_id):
66+
return [
67+
type_pb2.PtObject(name=ap.name, uri=ap.uri, embedded_type=type_pb2.ACCESS_POINT, access_point=ap)
68+
for ap in pt_object.stop_point.access_points
69+
if access_point_filter(ap)
70+
]
71+
6572
def get_crow_fly(
6673
self,
6774
origin,

source/jormungandr/jormungandr/pt_planners/pt_planner.py

+4
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,10 @@ def graphical_isochrones(
4646
):
4747
pass
4848

49+
@abstractmethod
50+
def get_access_points(self, pt_object, access_point_filter, request_id):
51+
raise NotImplementedError()
52+
4953

5054
class JourneyParameters(object):
5155
def __init__(

source/jormungandr/jormungandr/scenarios/distributed.py

+5-1
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,11 @@ def finalise_journeys(self, future_manager, request, responses, context, instanc
318318
journeys_to_complete = get_journeys_to_complete(responses, context, is_debug)
319319

320320
transfer_pool = TransferPool(
321-
future_manager=future_manager, instance=instance, request=request, request_id=request_id
321+
future_manager=future_manager,
322+
instance=instance,
323+
request=request,
324+
request_id=request_id,
325+
pt_planner_name=request['_pt_planner'],
322326
)
323327

324328
pt_journey_fare_pool = PtJourneyFarePool(

source/jormungandr/jormungandr/scenarios/helper_classes/transfer.py

+87-33
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import copy
3232
from collections import namedtuple
3333
from navitiacommon import response_pb2, type_pb2
34+
from jormungandr import app, cache
3435
import itertools
3536
import logging
3637
from jormungandr.street_network.street_network import StreetNetworkPathType
@@ -59,6 +60,7 @@
5960
"physical_mode:Metro",
6061
)
6162

63+
6264
# if `(physical_mode:A, physical_mode:B) in NO_ACCESS_POINTS_TRANSFER` then it means that a transfer
6365
# where we get out of a vehicle of `physical_mode:A` and then get in a vehicle of `physical_mode:B`
6466
# **will not** go through an access point
@@ -74,17 +76,32 @@
7476
itertools.product(ACCESS_POINTS_PHYSICAL_MODES, NO_ACCESS_POINTS_PHYSICAL_MODES)
7577
) | set(itertools.product(NO_ACCESS_POINTS_PHYSICAL_MODES, ACCESS_POINTS_PHYSICAL_MODES))
7678

77-
7879
TransferResult = namedtuple('TransferResult', ['direct_path', 'origin', 'destination'])
7980

8081

82+
class TransferPathArgs:
83+
def __init__(self, section, prev_section_mode, next_section_mode):
84+
self.prev_section_mode = prev_section_mode
85+
self.next_section_mode = next_section_mode
86+
self.section = section
87+
88+
def __repr__(self):
89+
return "{origin}:{destination}:{prev_section_mode}:{next_section_mode}".format(
90+
origin=self.section.origin.uri,
91+
destination=self.section.destination.uri,
92+
prev_section_mode=self.prev_section_mode,
93+
next_section_mode=self.next_section_mode,
94+
)
95+
96+
8197
class TransferPool(object):
8298
def __init__(
8399
self,
84100
future_manager,
85101
instance,
86102
request,
87103
request_id,
104+
pt_planner_name,
88105
):
89106
self._future_manager = future_manager
90107
self._instance = instance
@@ -93,6 +110,16 @@ def __init__(
93110
self._streetnetwork_service = self._instance.get_street_network(FallbackModes.walking.name, request)
94111
self._transfers_future = dict()
95112
self._logger = logging.getLogger(__name__)
113+
self._pt_planner = self._instance.get_pt_planner(pt_planner_name)
114+
115+
def __repr__(self):
116+
return "{name}:{language}:{publication_date}".format(
117+
name=self._instance.name, language=self.language, publication_date=self._instance.publication_date
118+
)
119+
120+
@property
121+
def language(self):
122+
return self._request.get('language', "en-US")
96123

97124
def _make_sub_request_id(self, origin_uri, destination_uri):
98125
return "{}_transfer_{}_{}".format(self._request_id, origin_uri, destination_uri)
@@ -167,32 +194,27 @@ def _do_no_access_point_transfer(self, section):
167194
def _aysnc_no_access_point_transfer(self, section):
168195
return self._future_manager.create_future(self._do_no_access_point_transfer, section)
169196

170-
def _get_access_points(self, stop_point_uri, access_point_filter=lambda x: x):
171-
sub_request_id = "{}_transfer_start_{}".format(self._request_id, stop_point_uri)
172-
stop_points = self._instance.georef.get_stop_points_from_uri(stop_point_uri, sub_request_id, depth=2)
173-
if not stop_points:
174-
return None
175-
176-
return [
177-
type_pb2.PtObject(name=ap.name, uri=ap.uri, embedded_type=type_pb2.ACCESS_POINT, access_point=ap)
178-
for ap in stop_points[0].access_points
179-
if access_point_filter(ap)
180-
]
181-
182197
def get_underlying_access_points(self, section, prev_section_mode, next_section_mode):
183198
"""
184199
find out based on with extremity of the section the access points are calculated and request the georef for
185200
access_points of the underlying stop_point
186201
return: access_points
187202
"""
203+
188204
if prev_section_mode in ACCESS_POINTS_PHYSICAL_MODES:
189-
return self._get_access_points(
190-
section.origin.uri, access_point_filter=lambda access_point: access_point.is_exit
205+
sub_request_id = "{}_transfer_start_{}".format(self._request_id, section.origin.uri)
206+
return self._pt_planner.get_access_points(
207+
section.origin,
208+
access_point_filter=lambda access_point: access_point.is_exit,
209+
request_id=sub_request_id,
191210
)
192211

193212
if next_section_mode in ACCESS_POINTS_PHYSICAL_MODES:
194-
return self._get_access_points(
195-
section.destination.uri, access_point_filter=lambda access_point: access_point.is_entrance
213+
sub_request_id = "{}_transfer_start_{}".format(self._request_id, section.destination.uri)
214+
return self._pt_planner.get_access_points(
215+
section.destination,
216+
access_point_filter=lambda access_point: access_point.is_entrance,
217+
request_id=sub_request_id,
196218
)
197219

198220
return None
@@ -241,6 +263,7 @@ def determinate_the_best_access_point(routing_matrix, access_points):
241263
return best_access_point
242264

243265
def _get_transfer_result(self, section, origin, destination):
266+
244267
sub_request_id = self._make_sub_request_id(origin.uri, destination.uri)
245268
direct_path_type = StreetNetworkPathType.DIRECT
246269
extremity = PeriodExtremity(section.end_date_time, False)
@@ -255,36 +278,48 @@ def _get_transfer_result(self, section, origin, destination):
255278
sub_request_id,
256279
)
257280
if direct_path and direct_path.journeys:
258-
return TransferResult(direct_path, origin, destination)
259-
return None
260-
261-
def _do_access_point_transfer(self, section, prev_section_mode, next_section_mode):
262-
access_points = self.get_underlying_access_points(section, prev_section_mode, next_section_mode)
281+
return (
282+
direct_path.SerializeToString(),
283+
origin.SerializeToString(),
284+
destination.SerializeToString(),
285+
)
286+
return None, None, None
287+
288+
@cache.memoize(app.config[str('CACHE_CONFIGURATION')].get(str('TIMEOUT_TRANSFER_PATH'), 24 * 60 * 60))
289+
def get_cached_transfer_path(self, transfer_path_args):
290+
access_points = self.get_underlying_access_points(
291+
transfer_path_args.section,
292+
transfer_path_args.prev_section_mode,
293+
transfer_path_args.next_section_mode,
294+
)
263295
# if no access points are found for this stop point, which is supposed to have access points
264296
# we do nothing about the transfer path
265297
if not access_points:
266-
return None
298+
return None, None, None
267299

268300
origins, destinations = self.determinate_matrix_entry(
269-
section, access_points, prev_section_mode, next_section_mode
301+
transfer_path_args.section,
302+
access_points,
303+
transfer_path_args.prev_section_mode,
304+
transfer_path_args.next_section_mode,
270305
)
271306

272307
if len(origins) > 1 and len(destinations) > 1:
273308
self._logger.error(
274309
"Error occurred when computing transfer path both origin's and destination's sizes are larger than 1"
275310
)
276-
return None
311+
return None, None, None
277312

278313
if len(origins) == 1 and len(destinations) == 1:
279-
return self._get_transfer_result(section, origins[0], destinations[0])
314+
return self._get_transfer_result(transfer_path_args.section, origins[0], destinations[0])
280315

281316
sub_request_id = "{}_transfer_matrix".format(self._request_id)
282317
routing_matrix = self._streetnetwork_service.get_street_network_routing_matrix(
283318
self._instance,
284319
origins,
285320
destinations,
286321
FallbackModes.walking.name,
287-
section.duration * 3,
322+
transfer_path_args.section.duration * 3,
288323
self._request,
289324
sub_request_id,
290325
)
@@ -293,16 +328,36 @@ def _do_access_point_transfer(self, section, prev_section_mode, next_section_mod
293328
matrix.routing_status == response_pb2.unreached for matrix in routing_matrix.rows[0].routing_response
294329
):
295330
logging.getLogger(__name__).warning("no access points is reachable in transfer path computation")
296-
return None
331+
return None, None, None
297332

298333
# now it's time to find the best combo
299334
# (stop_point -> access_points or access_points -> stop_point)
300335
best_access_point = self.determinate_the_best_access_point(routing_matrix, access_points)
301336

302337
origin, destination = self.determinate_direct_path_entry(
303-
section, best_access_point, prev_section_mode, next_section_mode
338+
transfer_path_args.section,
339+
best_access_point,
340+
transfer_path_args.prev_section_mode,
341+
transfer_path_args.next_section_mode,
304342
)
305-
return self._get_transfer_result(section, origin, destination)
343+
return self._get_transfer_result(transfer_path_args.section, origin, destination)
344+
345+
def _do_access_point_transfer(self, section, prev_section_mode, next_section_mode):
346+
path, origin, destination = self.get_cached_transfer_path(
347+
TransferPathArgs(section, prev_section_mode, next_section_mode)
348+
)
349+
if not path:
350+
return None
351+
pb_path = response_pb2.Response()
352+
pb_path.ParseFromString(path)
353+
354+
pb_origin = type_pb2.PtObject()
355+
pb_origin.ParseFromString(origin)
356+
357+
pb_destination = type_pb2.PtObject()
358+
pb_destination.ParseFromString(destination)
359+
360+
return TransferResult(pb_path, pb_origin, pb_destination)
306361

307362
def _aysnc_access_point_transfer(self, section, prev_section_mode, next_section_mode):
308363
return self._future_manager.create_future(
@@ -351,22 +406,21 @@ def wait_and_complete(self, section):
351406

352407
# we assume here the transfer street network has only one section, which is in walking mode
353408
transfer_street_network = transfer_direct_path.journeys[0].sections[0].street_network
354-
language = self._request.get('language', "en-US")
355409

356410
if self._is_access_point(transfer_result.origin):
357411
prepend_path_item_with_access_point(
358412
transfer_street_network.path_items,
359413
section.origin.stop_point,
360414
transfer_result.origin.access_point,
361-
language,
415+
self.language,
362416
)
363417

364418
if self._is_access_point(transfer_result.destination):
365419
append_path_item_with_access_point(
366420
transfer_street_network.path_items,
367421
section.destination.stop_point,
368422
transfer_result.destination.access_point,
369-
language,
423+
self.language,
370424
)
371425

372426
section.street_network.CopyFrom(transfer_street_network)

0 commit comments

Comments
 (0)