Skip to content

Commit cbbd086

Browse files
authored
feat: Add more fields in FileScanTask (apache#609)
Signed-off-by: Xuanwo <[email protected]>
1 parent 620d58e commit cbbd086

File tree

3 files changed

+57
-43
lines changed

3 files changed

+57
-43
lines changed

crates/iceberg/src/arrow/reader.rs

+6-6
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ impl ArrowReader {
133133
|(file_scan_task, file_io, tx)| async move {
134134
match file_scan_task {
135135
Ok(task) => {
136-
let file_path = task.data_file_path().to_string();
136+
let file_path = task.data_file_path.to_string();
137137

138138
spawn(async move {
139139
Self::process_file_scan_task(
@@ -171,7 +171,7 @@ impl ArrowReader {
171171
) -> Result<()> {
172172
// Get the metadata for the Parquet file we need to read and build
173173
// a reader for the data within
174-
let parquet_file = file_io.new_input(task.data_file_path())?;
174+
let parquet_file = file_io.new_input(&task.data_file_path)?;
175175
let (parquet_metadata, parquet_reader) =
176176
try_join!(parquet_file.metadata(), parquet_file.reader())?;
177177
let parquet_file_reader = ArrowFileReader::new(parquet_metadata, parquet_reader);
@@ -187,8 +187,8 @@ impl ArrowReader {
187187
// Create a projection mask for the batch stream to select which columns in the
188188
// Parquet file that we want in the response
189189
let projection_mask = Self::get_arrow_projection_mask(
190-
task.project_field_ids(),
191-
task.schema(),
190+
&task.project_field_ids,
191+
&task.schema,
192192
record_batch_stream_builder.parquet_schema(),
193193
record_batch_stream_builder.schema(),
194194
)?;
@@ -198,7 +198,7 @@ impl ArrowReader {
198198
record_batch_stream_builder = record_batch_stream_builder.with_batch_size(batch_size);
199199
}
200200

201-
if let Some(predicate) = task.predicate() {
201+
if let Some(predicate) = &task.predicate {
202202
let (iceberg_field_ids, field_id_map) = Self::build_field_id_set_and_map(
203203
record_batch_stream_builder.parquet_schema(),
204204
predicate,
@@ -218,7 +218,7 @@ impl ArrowReader {
218218
predicate,
219219
record_batch_stream_builder.metadata(),
220220
&field_id_map,
221-
task.schema(),
221+
&task.schema,
222222
)?;
223223

224224
selected_row_groups = Some(result);

crates/iceberg/src/scan.rs

+41-35
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,8 @@ use crate::io::object_cache::ObjectCache;
3636
use crate::io::FileIO;
3737
use crate::runtime::spawn;
3838
use crate::spec::{
39-
DataContentType, ManifestContentType, ManifestEntryRef, ManifestFile, ManifestList, Schema,
40-
SchemaRef, SnapshotRef, TableMetadataRef,
39+
DataContentType, DataFileFormat, ManifestContentType, ManifestEntryRef, ManifestFile,
40+
ManifestList, Schema, SchemaRef, SnapshotRef, TableMetadataRef,
4141
};
4242
use crate::table::Table;
4343
use crate::utils::available_parallelism;
@@ -529,14 +529,19 @@ impl ManifestEntryContext {
529529
/// created from it
530530
fn into_file_scan_task(self) -> FileScanTask {
531531
FileScanTask {
532-
data_file_path: self.manifest_entry.file_path().to_string(),
533532
start: 0,
534533
length: self.manifest_entry.file_size_in_bytes(),
534+
record_count: Some(self.manifest_entry.record_count()),
535+
536+
data_file_path: self.manifest_entry.file_path().to_string(),
537+
data_file_content: self.manifest_entry.content_type(),
538+
data_file_format: self.manifest_entry.file_format(),
539+
540+
schema: self.snapshot_schema,
535541
project_field_ids: self.field_ids.to_vec(),
536542
predicate: self
537543
.bound_predicates
538544
.map(|x| x.as_ref().snapshot_bound_predicate.clone()),
539-
schema: self.snapshot_schema,
540545
}
541546
}
542547
}
@@ -854,35 +859,30 @@ impl ExpressionEvaluatorCache {
854859
/// A task to scan part of file.
855860
#[derive(Debug, Clone, Serialize, Deserialize)]
856861
pub struct FileScanTask {
857-
data_file_path: String,
858-
start: u64,
859-
length: u64,
860-
project_field_ids: Vec<i32>,
862+
/// The start offset of the file to scan.
863+
pub start: u64,
864+
/// The length of the file to scan.
865+
pub length: u64,
866+
/// The number of records in the file to scan.
867+
///
868+
/// This is an optional field, and only available if we are
869+
/// reading the entire data file.
870+
pub record_count: Option<u64>,
871+
872+
/// The data file path corresponding to the task.
873+
pub data_file_path: String,
874+
/// The content type of the file to scan.
875+
pub data_file_content: DataContentType,
876+
/// The format of the file to scan.
877+
pub data_file_format: DataFileFormat,
878+
879+
/// The schema of the file to scan.
880+
pub schema: SchemaRef,
881+
/// The field ids to project.
882+
pub project_field_ids: Vec<i32>,
883+
/// The predicate to filter.
861884
#[serde(skip_serializing_if = "Option::is_none")]
862-
predicate: Option<BoundPredicate>,
863-
schema: SchemaRef,
864-
}
865-
866-
impl FileScanTask {
867-
/// Returns the data file path of this file scan task.
868-
pub fn data_file_path(&self) -> &str {
869-
&self.data_file_path
870-
}
871-
872-
/// Returns the project field id of this file scan task.
873-
pub fn project_field_ids(&self) -> &[i32] {
874-
&self.project_field_ids
875-
}
876-
877-
/// Returns the predicate of this file scan task.
878-
pub fn predicate(&self) -> Option<&BoundPredicate> {
879-
self.predicate.as_ref()
880-
}
881-
882-
/// Returns the schema id of this file scan task.
883-
pub fn schema(&self) -> &Schema {
884-
&self.schema
885-
}
885+
pub predicate: Option<BoundPredicate>,
886886
}
887887

888888
#[cfg(test)]
@@ -1219,17 +1219,17 @@ mod tests {
12191219

12201220
assert_eq!(tasks.len(), 2);
12211221

1222-
tasks.sort_by_key(|t| t.data_file_path().to_string());
1222+
tasks.sort_by_key(|t| t.data_file_path.to_string());
12231223

12241224
// Check first task is added data file
12251225
assert_eq!(
1226-
tasks[0].data_file_path(),
1226+
tasks[0].data_file_path,
12271227
format!("{}/1.parquet", &fixture.table_location)
12281228
);
12291229

12301230
// Check second task is existing data file
12311231
assert_eq!(
1232-
tasks[1].data_file_path(),
1232+
tasks[1].data_file_path,
12331233
format!("{}/3.parquet", &fixture.table_location)
12341234
);
12351235
}
@@ -1582,22 +1582,28 @@ mod tests {
15821582
);
15831583
let task = FileScanTask {
15841584
data_file_path: "data_file_path".to_string(),
1585+
data_file_content: DataContentType::Data,
15851586
start: 0,
15861587
length: 100,
15871588
project_field_ids: vec![1, 2, 3],
15881589
predicate: None,
15891590
schema: schema.clone(),
1591+
record_count: Some(100),
1592+
data_file_format: DataFileFormat::Parquet,
15901593
};
15911594
test_fn(task);
15921595

15931596
// with predicate
15941597
let task = FileScanTask {
15951598
data_file_path: "data_file_path".to_string(),
1599+
data_file_content: DataContentType::Data,
15961600
start: 0,
15971601
length: 100,
15981602
project_field_ids: vec![1, 2, 3],
15991603
predicate: Some(BoundPredicate::AlwaysTrue),
16001604
schema,
1605+
record_count: None,
1606+
data_file_format: DataFileFormat::Avro,
16011607
};
16021608
test_fn(task);
16031609
}

crates/iceberg/src/spec/manifest.rs

+10-2
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@ use std::sync::Arc;
2323

2424
use apache_avro::{from_value, to_value, Reader as AvroReader, Writer as AvroWriter};
2525
use bytes::Bytes;
26+
use serde_derive::{Deserialize, Serialize};
2627
use serde_json::to_vec;
28+
use serde_with::{DeserializeFromStr, SerializeDisplay};
2729
use typed_builder::TypedBuilder;
2830

2931
use self::_const_schema::{manifest_schema_v1, manifest_schema_v2};
@@ -866,6 +868,12 @@ impl ManifestEntry {
866868
&self.data_file.file_path
867869
}
868870

871+
/// Data file record count of the manifest entry.
872+
#[inline]
873+
pub fn record_count(&self) -> u64 {
874+
self.data_file.record_count
875+
}
876+
869877
/// Inherit data from manifest list, such as snapshot id, sequence number.
870878
pub(crate) fn inherit_data(&mut self, snapshot_entry: &ManifestFile) {
871879
if self.snapshot_id.is_none() {
@@ -1141,7 +1149,7 @@ impl DataFile {
11411149
}
11421150
/// Type of content stored by the data file: data, equality deletes, or
11431151
/// position deletes (all v1 files are data files)
1144-
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
1152+
#[derive(Debug, PartialEq, Eq, Clone, Copy, Serialize, Deserialize)]
11451153
pub enum DataContentType {
11461154
/// value: 0
11471155
Data = 0,
@@ -1168,7 +1176,7 @@ impl TryFrom<i32> for DataContentType {
11681176
}
11691177

11701178
/// Format of this data.
1171-
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
1179+
#[derive(Debug, PartialEq, Eq, Clone, Copy, SerializeDisplay, DeserializeFromStr)]
11721180
pub enum DataFileFormat {
11731181
/// Avro file format: <https://avro.apache.org/>
11741182
Avro,

0 commit comments

Comments
 (0)