Skip to content

Commit

Permalink
editoast: use gzip to compress cache data
Browse files Browse the repository at this point in the history
Signed-off-by: Youness CHRIFI ALAOUI <[email protected]>
  • Loading branch information
younesschrifi committed Jan 27, 2025
1 parent 4f2c918 commit 27b1545
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 6 deletions.
1 change: 1 addition & 0 deletions editoast/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions editoast/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ editoast_schemas.workspace = true
editoast_search = { workspace = true }
enum-map.workspace = true
enumset = "1.1.5"
flate2 = "1.0.35"
futures.workspace = true
futures-util.workspace = true
geos.workspace = true
Expand Down
58 changes: 56 additions & 2 deletions editoast/src/valkey_utils.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::fmt::Debug;

use flate2::read::GzDecoder;
use flate2::{write::GzEncoder, Compression};
use futures::future;
use futures::FutureExt;
use redis::aio::ConnectionLike;
Expand Down Expand Up @@ -119,7 +121,7 @@ impl ValkeyConnection {
}

/// Get a list of deserializable value from valkey
#[tracing::instrument(name = "cache:get_bulk", skip(self), err)]
#[tracing::instrument(name = "cache:json_get_bulk", skip(self), err)]
pub async fn json_get_bulk<T: DeserializeOwned, K: Debug + ToRedisArgs + Send + Sync>(
&mut self,
keys: &[K],
Expand Down Expand Up @@ -165,7 +167,7 @@ impl ValkeyConnection {
}

/// Set a list of serializable values to valkey
#[tracing::instrument(name = "cache:set_bulk", skip(self, items), err)]
#[tracing::instrument(name = "cache:json_set_bulk", skip(self, items), err)]
pub async fn json_set_bulk<K: Debug + ToRedisArgs + Send + Sync, T: Serialize>(
&mut self,
items: &[(K, T)],
Expand All @@ -192,6 +194,58 @@ impl ValkeyConnection {
self.mset(&serialized_items).await?;
Ok(())
}

#[tracing::instrument(name = "cache:gzip_set_bulk", skip(self, items), err)]
pub async fn gzip_set_bulk<K: Debug + ToRedisArgs + Send + Sync, T: Serialize>(
&mut self,
items: &[(K, T)],
) -> Result<()> {
// Avoid mset to fail if keys is empty
if items.is_empty() {
return Ok(());
}

let compressed_items = items
.iter()
.map(|(key, value)| {
let mut result = vec![];
let encoder = GzEncoder::new(&mut result, Compression::fast());
serde_json::to_writer(encoder, value)?;
Ok((key, result))
})
.collect::<Result<Vec<_>>>()?;

// Store the compressed values using mset
self.mset(&compressed_items).await?;
Ok(())
}

#[tracing::instrument(name = "cache:gzip_get_bulk", skip(self), err)]
pub async fn gzip_get_bulk<K: Debug + ToRedisArgs + Send + Sync, T: DeserializeOwned>(
&mut self,
keys: &[K],
) -> Result<Vec<Option<T>>> {
// Avoid mget to fail if keys is empty
if keys.is_empty() {
return Ok(vec![]);
}

// Fetch the values from Redis
let values: Vec<Option<Vec<u8>>> = self.mget(keys).await?;

// Decompress each value if it exists
values
.into_iter()
.map(|value| match value {
Some(compressed_data) => {
let decoder = GzDecoder::new(&compressed_data[..]);
let deserialized: T = serde_json::from_reader(decoder)?;
Ok(Some(deserialized))
}
None => Ok(None),
})
.collect()
}
}

#[derive(Clone)]
Expand Down
4 changes: 2 additions & 2 deletions editoast/src/views/path/pathfinding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ async fn pathfinding_blocks_batch(

// Try to retrieve the result from Valkey
let pathfinding_cached_results: Vec<Option<PathfindingResult>> =
valkey_conn.json_get_bulk(&hashes).await?;
valkey_conn.gzip_get_bulk(&hashes).await?;
let pathfinding_cached_results: HashMap<_, _> =
hashes.into_iter().zip(pathfinding_cached_results).collect();

Expand Down Expand Up @@ -325,7 +325,7 @@ async fn pathfinding_blocks_batch(
}

debug!(nb_cached = to_cache.len(), "Caching pathfinding response");
valkey_conn.json_set_bulk(&to_cache).await?;
valkey_conn.gzip_set_bulk(&to_cache).await?;

Ok(pathfinding_results)
}
Expand Down
4 changes: 2 additions & 2 deletions editoast/src/views/train_schedule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@ pub async fn consist_train_simulation_batch(
);
let cached_simulation_hash = to_sim.keys().collect::<Vec<_>>();
let cached_results: Vec<Option<SimulationResponse>> =
valkey_conn.json_get_bulk(&cached_simulation_hash).await?;
valkey_conn.gzip_get_bulk(&cached_simulation_hash).await?;

let nb_hit = cached_results.iter().flatten().count();
let nb_miss = to_sim.len() - nb_hit;
Expand Down Expand Up @@ -561,7 +561,7 @@ pub async fn consist_train_simulation_batch(
}

// Cache the simulation response
valkey_conn.json_set_bulk(&to_cache).await?;
valkey_conn.gzip_set_bulk(&to_cache).await?;

// Return the response
Ok(simulation_results
Expand Down

0 comments on commit 27b1545

Please sign in to comment.