Skip to content

Commit 928ae02

Browse files
authored
Merge pull request #4241 from hove-io/parallel_calling_asgard_when_using_poi_access_points
[Distributed] Parallel calling asgard when using poi access points
2 parents 6dd6fd4 + 2fba3ec commit 928ae02

File tree

1 file changed

+43
-37
lines changed

1 file changed

+43
-37
lines changed

source/jormungandr/jormungandr/scenarios/helper_classes/streetnetwork_path.py

+43-37
Original file line numberDiff line numberDiff line change
@@ -88,9 +88,10 @@ def __init__(
8888
self._fallback_extremity = fallback_extremity
8989
self._request = request
9090
self._path_type = streetnetwork_path_type
91-
self._value = None
91+
self._futures = []
9292
self._logger = logging.getLogger(__name__)
9393
self._request_id = request_id
94+
self._best_dp = None
9495
self._async_request()
9596

9697
@staticmethod
@@ -140,35 +141,26 @@ def finalize_direct_path(self, resp_direct_path):
140141
self.make_poi_access_points(StreetNetworkPathType.ENDING_FALLBACK, resp_direct_path)
141142
return resp_direct_path.response
142143

143-
def build_direct_path(self):
144-
best_direct_path = None
145-
for origin in self.get_pt_objects(self._orig_obj):
146-
for destination in self.get_pt_objects(self._dest_obj):
147-
response = self._streetnetwork_service.direct_path_with_fp(
148-
self._instance,
149-
self._mode,
150-
origin,
151-
destination,
152-
self._fallback_extremity,
153-
self._request,
154-
self._path_type,
155-
self._request_id,
156-
)
157-
if not is_valid_direct_path(response):
158-
continue
159-
if not best_direct_path:
160-
best_direct_path = Dp_element(origin, destination, response)
161-
elif (
162-
response.journeys[0].durations.total < best_direct_path.response.journeys[0].durations.total
163-
):
164-
best_direct_path = Dp_element(origin, destination, response)
165-
return self.finalize_direct_path(best_direct_path)
144+
def build_direct_path(self, origin, destination):
145+
response = self._streetnetwork_service.direct_path_with_fp(
146+
self._instance,
147+
self._mode,
148+
origin,
149+
destination,
150+
self._fallback_extremity,
151+
self._request,
152+
self._path_type,
153+
self._request_id,
154+
)
155+
if is_valid_direct_path(response):
156+
return response
157+
return None
166158

167159
@new_relic.distributedEvent("direct_path", "street_network")
168-
def _direct_path_with_fp(self):
160+
def _direct_path_with_fp(self, origin, destination):
169161
with timed_logger(self._logger, 'direct_path_calling_external_service', self._request_id):
170162
try:
171-
return self.build_direct_path()
163+
return self.build_direct_path(origin, destination)
172164
except GeoveloTechnicalError as e:
173165
logging.getLogger(__name__).exception('')
174166
raise StreetNetworkException(response_pb2.Error.service_unavailable, e.data["message"])
@@ -202,7 +194,7 @@ def get_pt_object_destination(self, dp):
202194
return self.poi_to_pt_object(poi_access_point)
203195
return None
204196

205-
def _do_request(self):
197+
def _do_request(self, origin, destination):
206198
self._logger.debug(
207199
"requesting %s direct path from %s to %s by %s",
208200
self._path_type,
@@ -211,11 +203,7 @@ def _do_request(self):
211203
self._mode,
212204
)
213205

214-
dp = self._direct_path_with_fp(self._streetnetwork_service)
215-
origin = self.get_pt_object_origin(dp)
216-
destination = self.get_pt_object_destination(dp)
217-
prepend_first_coord(dp, origin)
218-
append_last_coord(dp, destination)
206+
dp = self._direct_path_with_fp(self._streetnetwork_service, origin, destination)
219207

220208
if getattr(dp, "journeys", None):
221209
dp.journeys[0].internal_id = str(utils.generate_id())
@@ -227,15 +215,33 @@ def _do_request(self):
227215
self._dest_obj.uri,
228216
self._mode,
229217
)
230-
return dp
218+
return Dp_element(origin, destination, dp)
231219

232220
def _async_request(self):
233-
self._value = self._future_manager.create_future(self._do_request)
221+
self._futures = []
222+
for origin in self.get_pt_objects(self._orig_obj):
223+
for destination in self.get_pt_objects(self._dest_obj):
224+
self._futures.append(self._future_manager.create_future(self._do_request, origin, destination))
234225

235226
def wait_and_get(self):
236-
if self._value:
237-
return self._value.wait_and_get()
238-
return None
227+
228+
best_res = min(
229+
(future.wait_and_get() for future in self._futures if future.wait_and_get().response is not None),
230+
key=lambda r: r.response.journeys[0].duration,
231+
default=None,
232+
)
233+
if best_res is None:
234+
return response_pb2.Response()
235+
236+
if self._best_dp is None:
237+
dp = self.finalize_direct_path(best_res)
238+
origin = self.get_pt_object_origin(dp)
239+
destination = self.get_pt_object_destination(dp)
240+
prepend_first_coord(dp, origin)
241+
append_last_coord(dp, destination)
242+
self._best_dp = dp
243+
244+
return self._best_dp
239245

240246

241247
class StreetNetworkPathPool:

0 commit comments

Comments
 (0)