Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Kraken] Fix aging problem when both kirin & chaos are active #3909

Merged
merged 8 commits into from
Jan 18, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 14 additions & 9 deletions source/kraken/apply_disruption.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,14 @@ nt::VehicleJourney* create_vj_from_old_vj(nt::MetaVehicleJourney* mvj,
return new_vj;
}

std::string make_new_vj_uri(const std::string& mvj_uri,
nt::RTLevel rt_level,
int nb_rt_vj,
const std::string& disruption_uri) {
return "vehicle_journey:" + mvj_uri + ":" + type::get_string_from_rt_level(rt_level) + ":"
+ std::to_string(nb_rt_vj) + ":" + disruption_uri;
}

struct apply_impacts_visitor : public boost::static_visitor<> {
boost::shared_ptr<nt::disruption::Impact> impact;
nt::PT_Data& pt_data;
Expand Down Expand Up @@ -152,7 +160,7 @@ struct apply_impacts_visitor : public boost::static_visitor<> {
// we cannot ensure that all VJ of a MetaVJ are on the same route,
// and since we want all actions to operate on MetaVJ, we collect all MetaVJ of the route
// (but we'll change only the route's vj)
std::set<nt::MetaVehicleJourney*> mvjs;
std::unordered_set<nt::MetaVehicleJourney*> mvjs;
route->for_each_vehicle_journey([&mvjs](nt::VehicleJourney& vj) {
mvjs.insert(vj.meta_vj);
return true;
Expand Down Expand Up @@ -318,8 +326,8 @@ struct add_impacts_visitor : public apply_impacts_visitor {
}

auto nb_rt_vj = mvj->get_rt_vj().size();
std::string new_vj_uri =
"vehicle_journey:" + mvj->uri + ":modified:" + std::to_string(nb_rt_vj) + ":" + impact->disruption->uri;
auto new_vj_uri = make_new_vj_uri(mvj->uri, rt_level, nb_rt_vj, impact->disruption->uri);

std::vector<type::StopTime> stoptimes; // we copy all the stoptimes
for (const auto& stu : impact->aux_info.stop_times) {
stoptimes.push_back(stu.stop_time);
Expand Down Expand Up @@ -491,8 +499,7 @@ struct add_impacts_visitor : public apply_impacts_visitor {
continue;
}
auto nb_rt_vj = mvj->get_vjs_at(rt_level).size();
std::string new_vj_uri = vj->uri + ":" + type::get_string_from_rt_level(rt_level) + ":"
+ std::to_string(nb_rt_vj) + ":" + impact->disruption->uri;
auto new_vj_uri = make_new_vj_uri(mvj->uri, rt_level, nb_rt_vj, impact->disruption->uri);

new_vp.days = new_vp.days & (vj->validity_patterns[rt_level]->days >> vj->shift);

Expand Down Expand Up @@ -560,8 +567,7 @@ struct add_impacts_visitor : public apply_impacts_visitor {
continue;
}
auto nb_rt_vj = mvj->get_vjs_at(rt_level).size();
std::string new_vj_uri = vj->uri + ":" + type::get_string_from_rt_level(rt_level) + ":"
+ std::to_string(nb_rt_vj) + ":" + impact->disruption->uri;
auto new_vj_uri = make_new_vj_uri(mvj->uri, rt_level, nb_rt_vj, impact->disruption->uri);

new_vp.days = new_vp.days & (vj->validity_patterns[rt_level]->days >> vj->shift);

Expand Down Expand Up @@ -716,8 +722,7 @@ struct add_impacts_visitor : public apply_impacts_visitor {
mvj->push_unique_impact(impact);

auto nb_rt_vj = mvj->get_vjs_at(rt_level).size();
std::string new_vj_uri = "vehicle_journey:" + mvj->uri + ":" + type::get_string_from_rt_level(rt_level)
+ ":" + std::to_string(nb_rt_vj) + concatenate_impact_uris(*mvj);
auto new_vj_uri = make_new_vj_uri(mvj->uri, rt_level, nb_rt_vj, impact->disruption->uri);

new_vp.days = new_vp.days & (vj->validity_patterns[rt_level]->days >> vj->shift);

Expand Down
12 changes: 11 additions & 1 deletion source/kraken/configuration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,9 @@ po::options_description get_options_description(const boost::optional<std::strin
("BROKER.vhost", po::value<std::string>()->default_value("/"), "vhost for rabbitmq")
("BROKER.exchange", po::value<std::string>()->default_value("navitia"), "exchange used in rabbitmq")
("BROKER.rt_topics", po::value<std::vector<std::string>>(), "list of realtime topic for this instance")
("BROKER.timeout", po::value<int>()->default_value(100), "timeout for maintenance worker in millisecond")
("BROKER.timeout", po::value<int>()->default_value(200), "timeout for maintenance worker in millisecond")
("BROKER.max_batch_nb", po::value<int>()->default_value(5000), "max size of the realtime message")
("BROKER.retrieving_timeout", po::value<int>()->default_value(10000), "max duration the worker is going to spend when retrieving messages")
("BROKER.sleeptime", po::value<int>()->default_value(1), "sleeptime for maintenance worker in second")
("BROKER.reconnect_wait", po::value<int>()->default_value(1), "Wait duration between connection attempts to rabbitmq, in seconds")
("BROKER.queue", po::value<std::string>(), "rabbitmq's queue name to be bound")
Expand Down Expand Up @@ -232,6 +234,14 @@ int Configuration::broker_timeout() const {
return vm["BROKER.timeout"].as<int>();
}

int Configuration::retrieving_timeout() const {
return vm["BROKER.retrieving_timeout"].as<int>();
}

int Configuration::broker_max_batch_nb() const {
return vm["BROKER.max_batch_nb"].as<int>();
}

int Configuration::broker_sleeptime() const {
return vm["BROKER.sleeptime"].as<int>();
}
Expand Down
2 changes: 2 additions & 0 deletions source/kraken/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ class Configuration {
bool broker_queue_auto_delete() const;
int broker_queue_expire() const;
int broker_timeout() const;
int retrieving_timeout() const;
int broker_max_batch_nb() const;
int broker_sleeptime() const;
int broker_reconnect_wait() const;
bool is_realtime_enabled() const;
Expand Down
17 changes: 14 additions & 3 deletions source/kraken/maintenance_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ www.navitia.io
#include <boost/algorithm/string/join.hpp>
#include <boost/optional.hpp>
#include <boost/thread/thread.hpp>
#include <boost/range/adaptor/reversed.hpp>

#include <chrono>
#include <csignal>
Expand Down Expand Up @@ -364,7 +365,9 @@ void MaintenanceWorker::handle_rt_in_batch(const std::vector<AmqpClient::Envelop
pt::ptime begin = pt::microsec_clock::universal_time();
bool autocomplete_rebuilding_activated = false;
auto rt_action = RTAction::chaos;
for (auto& envelope : envelopes) {

std::unordered_set<std::string> applied_visited_id;
for (auto& envelope : boost::adaptors::reverse(envelopes)) {
const auto routing_key = envelope->RoutingKey();
LOG4CPLUS_DEBUG(logger, "realtime info received from " << routing_key);
assert(envelope);
Expand All @@ -375,6 +378,10 @@ void MaintenanceWorker::handle_rt_in_batch(const std::vector<AmqpClient::Envelop
}
LOG4CPLUS_TRACE(logger, "received entity: " << feed_message.DebugString());
for (const auto& entity : feed_message.entity()) {
auto res = applied_visited_id.insert(entity.id());
if (!res.second) {
continue;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
auto res = applied_visited_id.insert(entity.id());
if (!res.second) {
continue;
}
auto res = applied_visited_id.insert(entity.id());
// an newer disruption with the same id has already been seen, so we can ignore this one
if (!res.second) {
continue;
}

I am a bit worried about this "ignore disruptions whose id has already been seen".

  • do we receive kirin disruptions with the same id ? When this happens are we sure we only need to take into account the last one ? Can we have two disruptions with the same id but that affect different vj ? poke @pbougue
  • what about chaos disruptions ? In particular, when we cancel a disruption, don't we get the same id twice ?

Copy link
Contributor

@pbougue pbougue Jan 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may deserve a code-comment, as the 3 of us asked ourselves the question 😃

  • For kirin this is guaranteed that entity ID is the same only when the VJ is the same, and that taking only the last in the queue is valid.
    Details: actually it's not so true with parallelism on kirin, but the guarantee is that it's gonna be as good as it was (nothing else can be used to decide which message is the last one currently). Actually, even with parallelism it's guaranteed (no concurrent processing on the same VJ in Kirin).
    💡 This may deserve a comment in https://github.com/hove-io/chaos-proto/blob/master/kirin_proto_doc.rs ? (I can have a shot if you agree)

  • For chaos, this was discussed with chaos team and seemed OK, but I don't know about that particular case, I let @xlqian reply if he knows.

Copy link
Contributor

@pbench pbench Jan 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Allright if this has been checked, it sounds good to me.
I do agree that some comments (here and in the proto doc) explaining this may be useful.

For kirin this is guaranteed that entity ID is the same only when the VJ is the same

same VJ and same date ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, same VJ and same date 👍

✅ I'll try to add a little something into kirin_proto_doc.rs

Copy link
Member Author

@xlqian xlqian Jan 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pbench Now I'm having a second thought with your comment and I think I made a mistake after thinking it through...
@pbougue I'm going to remove this trick(reversing the vector) in this PR and open up another PR to tackle this problem so that in case I messed it up, we don't have to revert the whole thing

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also have to reverse the order of the entities in the message to be safe that we still have the same result (although it would be weird for someone to send multiple time the same entity in the same message).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After discussion: this was reverted from this PR because of uncertainty around chaos.
Tracked in JIRA https://navitia.atlassian.net/browse/NAV-1878

if (!data) {
pt::ptime copy_begin = pt::microsec_clock::universal_time();
data = data_manager.get_data_clone();
Expand Down Expand Up @@ -452,7 +459,11 @@ std::vector<AmqpClient::Envelope::ptr_t> MaintenanceWorker::consume_in_batch(con
std::vector<AmqpClient::Envelope::ptr_t> envelopes;
envelopes.reserve(max_nb);
size_t consumed_nb = 0;
while (consumed_nb < max_nb) {
auto begin = pt::microsec_clock::universal_time();

auto retrieving_timeout = conf.retrieving_timeout();
while (consumed_nb < max_nb
&& (pt::microsec_clock::universal_time() - begin).total_milliseconds() < retrieving_timeout) {
AmqpClient::Envelope::ptr_t envelope{};

/* !
Expand Down Expand Up @@ -499,7 +510,7 @@ void MaintenanceWorker::listen_rabbitmq() {

// Arbitrary Number: we suppose that disruptions can be handled very quickly so that,
// in theory, we can handle a batch of 5000 disruptions in one time very quickly too.
size_t max_batch_nb = 5000;
size_t max_batch_nb = conf.broker_max_batch_nb();

try {
auto rt_envelopes = consume_in_batch(rt_tag, max_batch_nb, timeout_ms, no_ack);
Expand Down