Skip to content

Commit

Permalink
editoast: use zstd
Browse files Browse the repository at this point in the history
Signed-off-by: Youness CHRIFI ALAOUI <[email protected]>
  • Loading branch information
younesschrifi committed Jan 31, 2025
1 parent 27b1545 commit ec3675f
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 28 deletions.
21 changes: 20 additions & 1 deletion editoast/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion editoast/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,6 @@ editoast_schemas.workspace = true
editoast_search = { workspace = true }
enum-map.workspace = true
enumset = "1.1.5"
flate2 = "1.0.35"
futures.workspace = true
futures-util.workspace = true
geos.workspace = true
Expand All @@ -150,6 +149,7 @@ json-patch = { version = "3.0.1", default-features = false, features = [
"utoipa",
] }
lapin = "2.5.0"
lz4_flex = {version = "0.11.3", default-features = false, features = ["frame"] }
mime = "0.3.17"
mvt.workspace = true
opentelemetry.workspace = true
Expand Down
62 changes: 37 additions & 25 deletions editoast/src/valkey_utils.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
use std::fmt::Debug;

use flate2::read::GzDecoder;
use flate2::{write::GzEncoder, Compression};
use futures::future;
use futures::FutureExt;
use redis::aio::ConnectionLike;
Expand All @@ -18,7 +16,7 @@ use redis::RedisResult;
use redis::ToRedisArgs;
use serde::de::DeserializeOwned;
use serde::Serialize;
use tracing::trace;
use tracing::{debug, span, trace, Level};
use url::Url;

use crate::error::Result;
Expand Down Expand Up @@ -205,18 +203,27 @@ impl ValkeyConnection {
return Ok(());
}

let compressed_items = items
.iter()
.map(|(key, value)| {
let mut result = vec![];
let encoder = GzEncoder::new(&mut result, Compression::fast());
serde_json::to_writer(encoder, value)?;
Ok((key, result))
})
.collect::<Result<Vec<_>>>()?;
let compressed_items = span!(Level::INFO, "Compressing data").in_scope(|| {
items
.iter()
.map(|(key, value)| {
let mut encoder = lz4_flex::frame::FrameEncoder::new(Vec::new());
serde_json::to_writer(&mut encoder, value)?;
let compressed_value = encoder.finish().map_err(|_| {
RedisError::from((
ErrorKind::IoError,
"An error occured compressing the value",
))
})?;
Ok((key, compressed_value))
})
.collect::<Result<Vec<_>>>()
})?;

// Store the compressed values using mset
self.mset(&compressed_items).await?;
span!(Level::INFO, "Sending items to Redis")
.in_scope(|| async move { self.mset(&compressed_items).await })
.await?;
Ok(())
}

Expand All @@ -229,22 +236,27 @@ impl ValkeyConnection {
if keys.is_empty() {
return Ok(vec![]);
}
debug!(nb_keys = keys.len());

// Fetch the values from Redis
let values: Vec<Option<Vec<u8>>> = self.mget(keys).await?;
let values = span!(Level::INFO, "Fetching values from Redis")
.in_scope(|| async move { self.mget::<_, Vec<Option<Vec<u8>>>>(keys).await })
.await?;

// Decompress each value if it exists
values
.into_iter()
.map(|value| match value {
Some(compressed_data) => {
let decoder = GzDecoder::new(&compressed_data[..]);
let deserialized: T = serde_json::from_reader(decoder)?;
Ok(Some(deserialized))
}
None => Ok(None),
})
.collect()
span!(Level::INFO, "Decompressing data").in_scope(|| {
values
.into_iter()
.map(|value| match value {
Some(compressed_data) => {
let mut decoder = lz4_flex::frame::FrameDecoder::new(&compressed_data[..]);
let deserialized: T = serde_json::from_reader(&mut decoder)?;
Ok(Some(deserialized))
}
None => Ok(None),
})
.collect()
})
}
}

Expand Down
2 changes: 1 addition & 1 deletion editoast/src/views/train_schedule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ editoast_common::schemas! {
projection::schemas(),
}

pub const TRAIN_SIZE_BATCH: usize = 500;
pub const TRAIN_SIZE_BATCH: usize = 250;

#[derive(Debug, Error, EditoastError)]
#[editoast_error(base_id = "train_schedule")]
Expand Down

0 comments on commit ec3675f

Please sign in to comment.