Skip to content

Commit fea1817

Browse files
committed
Merge branch 'feat/safe-partition-spec'
1 parent 1f49c95 commit fea1817

File tree

10 files changed

+1083
-608
lines changed

10 files changed

+1083
-608
lines changed

crates/catalog/memory/src/catalog.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -355,7 +355,7 @@ mod tests {
355355

356356
assert_eq!(metadata.current_schema().as_ref(), expected_schema);
357357

358-
let expected_partition_spec = PartitionSpec::builder(expected_schema)
358+
let expected_partition_spec = PartitionSpec::builder((*expected_schema).clone())
359359
.with_spec_id(0)
360360
.build()
361361
.unwrap();
@@ -365,7 +365,7 @@ mod tests {
365365
.partition_specs_iter()
366366
.map(|p| p.as_ref())
367367
.collect_vec(),
368-
vec![&expected_partition_spec]
368+
vec![&expected_partition_spec.into_schemaless()]
369369
);
370370

371371
let expected_sorted_order = SortOrder::builder()

crates/iceberg/src/expr/visitors/expression_evaluator.rs

+64-67
Large diffs are not rendered by default.

crates/iceberg/src/expr/visitors/inclusive_metrics_evaluator.rs

+5-6
Original file line numberDiff line numberDiff line change
@@ -504,10 +504,10 @@ mod test {
504504

505505
#[test]
506506
fn test_data_file_no_partitions() {
507-
let (table_schema_ref, _partition_spec_ref) = create_test_schema_and_partition_spec();
507+
let partition_spec_ref = create_test_partition_spec();
508508

509509
let partition_filter = Predicate::AlwaysTrue
510-
.bind(table_schema_ref.clone(), false)
510+
.bind(partition_spec_ref.schema_ref().clone(), false)
511511
.unwrap();
512512

513513
let case_sensitive = false;
@@ -1645,7 +1645,7 @@ mod test {
16451645
assert!(result, "Should read: NotIn on no nulls column");
16461646
}
16471647

1648-
fn create_test_schema_and_partition_spec() -> (Arc<Schema>, Arc<PartitionSpec>) {
1648+
fn create_test_partition_spec() -> Arc<PartitionSpec> {
16491649
let table_schema = Schema::builder()
16501650
.with_fields(vec![Arc::new(NestedField::optional(
16511651
1,
@@ -1656,7 +1656,7 @@ mod test {
16561656
.unwrap();
16571657
let table_schema_ref = Arc::new(table_schema);
16581658

1659-
let partition_spec = PartitionSpec::builder(&table_schema_ref)
1659+
let partition_spec = PartitionSpec::builder(table_schema_ref.clone())
16601660
.with_spec_id(1)
16611661
.add_unbound_fields(vec![UnboundPartitionField::builder()
16621662
.source_id(1)
@@ -1667,8 +1667,7 @@ mod test {
16671667
.unwrap()
16681668
.build()
16691669
.unwrap();
1670-
let partition_spec_ref = Arc::new(partition_spec);
1671-
(table_schema_ref, partition_spec_ref)
1670+
Arc::new(partition_spec)
16721671
}
16731672

16741673
fn not_null(reference: &str) -> BoundPredicate {

crates/iceberg/src/expr/visitors/inclusive_projection.rs

+110-45
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,16 @@ use fnv::FnvHashSet;
2121

2222
use crate::expr::visitors::bound_predicate_visitor::{visit, BoundPredicateVisitor};
2323
use crate::expr::{BoundPredicate, BoundReference, Predicate};
24-
use crate::spec::{Datum, PartitionField, PartitionSpecRef};
24+
use crate::spec::{Datum, PartitionField, SchemalessPartitionSpecRef};
2525
use crate::Error;
2626

2727
pub(crate) struct InclusiveProjection {
28-
partition_spec: PartitionSpecRef,
28+
partition_spec: SchemalessPartitionSpecRef,
2929
cached_parts: HashMap<i32, Vec<PartitionField>>,
3030
}
3131

3232
impl InclusiveProjection {
33-
pub(crate) fn new(partition_spec: PartitionSpecRef) -> Self {
33+
pub(crate) fn new(partition_spec: SchemalessPartitionSpecRef) -> Self {
3434
Self {
3535
partition_spec,
3636
cached_parts: HashMap::new(),
@@ -235,7 +235,7 @@ mod tests {
235235
use crate::expr::visitors::inclusive_projection::InclusiveProjection;
236236
use crate::expr::{Bind, Predicate, Reference};
237237
use crate::spec::{
238-
Datum, NestedField, PartitionField, PartitionSpec, PrimitiveType, Schema, Transform, Type,
238+
Datum, NestedField, PartitionSpec, PrimitiveType, Schema, Transform, Type,
239239
UnboundPartitionField,
240240
};
241241

@@ -265,13 +265,14 @@ mod tests {
265265
#[test]
266266
fn test_inclusive_projection_logic_ops() {
267267
let schema = build_test_schema();
268+
let arc_schema = Arc::new(schema);
268269

269-
let partition_spec = PartitionSpec::builder(&schema)
270+
let partition_spec = PartitionSpec::builder(arc_schema.clone())
270271
.with_spec_id(1)
271272
.build()
272-
.unwrap();
273+
.unwrap()
274+
.into_schemaless();
273275

274-
let arc_schema = Arc::new(schema);
275276
let arc_partition_spec = Arc::new(partition_spec);
276277

277278
// this predicate contains only logic operators,
@@ -295,8 +296,9 @@ mod tests {
295296
#[test]
296297
fn test_inclusive_projection_identity_transform() {
297298
let schema = build_test_schema();
299+
let arc_schema = Arc::new(schema);
298300

299-
let partition_spec = PartitionSpec::builder(&schema)
301+
let partition_spec = PartitionSpec::builder(arc_schema.clone())
300302
.with_spec_id(1)
301303
.add_unbound_field(
302304
UnboundPartitionField::builder()
@@ -308,9 +310,9 @@ mod tests {
308310
)
309311
.unwrap()
310312
.build()
311-
.unwrap();
313+
.unwrap()
314+
.into_schemaless();
312315

313-
let arc_schema = Arc::new(schema);
314316
let arc_partition_spec = Arc::new(partition_spec);
315317

316318
let unbound_predicate = Reference::new("a").less_than(Datum::int(10));
@@ -321,7 +323,7 @@ mod tests {
321323
// should result in the same Predicate as the original
322324
// `unbound_predicate`, since we have just a single partition field,
323325
// and it has an Identity transform
324-
let mut inclusive_projection = InclusiveProjection::new(arc_partition_spec.clone());
326+
let mut inclusive_projection = InclusiveProjection::new(arc_partition_spec);
325327
let result = inclusive_projection.project(&bound_predicate).unwrap();
326328

327329
let expected = "a < 10".to_string();
@@ -330,34 +332,95 @@ mod tests {
330332
}
331333

332334
#[test]
333-
fn test_inclusive_projection_date_transforms() {
335+
fn test_inclusive_projection_date_year_transform() {
334336
let schema = build_test_schema();
337+
let arc_schema = Arc::new(schema);
338+
339+
let partition_spec = PartitionSpec::builder(arc_schema.clone())
340+
.with_spec_id(1)
341+
.add_unbound_fields(vec![UnboundPartitionField {
342+
source_id: 2,
343+
name: "year".to_string(),
344+
field_id: Some(1000),
345+
transform: Transform::Year,
346+
}])
347+
.unwrap()
348+
.build()
349+
.unwrap()
350+
.into_schemaless();
351+
352+
let arc_partition_spec = Arc::new(partition_spec);
353+
354+
let unbound_predicate =
355+
Reference::new("date").less_than(Datum::date_from_str("2024-01-01").unwrap());
356+
357+
let bound_predicate = unbound_predicate.bind(arc_schema.clone(), false).unwrap();
358+
359+
// applying InclusiveProjection to bound_predicate
360+
// should result in a predicate that correctly handles
361+
// year, month and date
362+
let mut inclusive_projection = InclusiveProjection::new(arc_partition_spec);
363+
let result = inclusive_projection.project(&bound_predicate).unwrap();
364+
365+
let expected = "year <= 53".to_string();
366+
367+
assert_eq!(result.to_string(), expected);
368+
}
369+
370+
#[test]
371+
fn test_inclusive_projection_date_month_transform() {
372+
let schema = build_test_schema();
373+
let arc_schema = Arc::new(schema);
374+
375+
let partition_spec = PartitionSpec::builder(arc_schema.clone())
376+
.with_spec_id(1)
377+
.add_unbound_fields(vec![UnboundPartitionField {
378+
source_id: 2,
379+
name: "month".to_string(),
380+
field_id: Some(1000),
381+
transform: Transform::Month,
382+
}])
383+
.unwrap()
384+
.build()
385+
.unwrap()
386+
.into_schemaless();
387+
388+
let arc_partition_spec = Arc::new(partition_spec);
389+
390+
let unbound_predicate =
391+
Reference::new("date").less_than(Datum::date_from_str("2024-01-01").unwrap());
392+
393+
let bound_predicate = unbound_predicate.bind(arc_schema.clone(), false).unwrap();
335394

336-
let partition_spec = PartitionSpec {
337-
spec_id: 1,
338-
fields: vec![
339-
PartitionField {
340-
source_id: 2,
341-
name: "year".to_string(),
342-
field_id: 1000,
343-
transform: Transform::Year,
344-
},
345-
PartitionField {
346-
source_id: 2,
347-
name: "month".to_string(),
348-
field_id: 1001,
349-
transform: Transform::Month,
350-
},
351-
PartitionField {
352-
source_id: 2,
353-
name: "day".to_string(),
354-
field_id: 1002,
355-
transform: Transform::Day,
356-
},
357-
],
358-
};
395+
// applying InclusiveProjection to bound_predicate
396+
// should result in a predicate that correctly handles
397+
// year, month and date
398+
let mut inclusive_projection = InclusiveProjection::new(arc_partition_spec);
399+
let result = inclusive_projection.project(&bound_predicate).unwrap();
400+
401+
let expected = "month <= 647".to_string();
402+
403+
assert_eq!(result.to_string(), expected);
404+
}
359405

406+
#[test]
407+
fn test_inclusive_projection_date_day_transform() {
408+
let schema = build_test_schema();
360409
let arc_schema = Arc::new(schema);
410+
411+
let partition_spec = PartitionSpec::builder(arc_schema.clone())
412+
.with_spec_id(1)
413+
.add_unbound_fields(vec![UnboundPartitionField {
414+
source_id: 2,
415+
name: "day".to_string(),
416+
field_id: Some(1000),
417+
transform: Transform::Day,
418+
}])
419+
.unwrap()
420+
.build()
421+
.unwrap()
422+
.into_schemaless();
423+
361424
let arc_partition_spec = Arc::new(partition_spec);
362425

363426
let unbound_predicate =
@@ -368,19 +431,20 @@ mod tests {
368431
// applying InclusiveProjection to bound_predicate
369432
// should result in a predicate that correctly handles
370433
// year, month and date
371-
let mut inclusive_projection = InclusiveProjection::new(arc_partition_spec.clone());
434+
let mut inclusive_projection = InclusiveProjection::new(arc_partition_spec);
372435
let result = inclusive_projection.project(&bound_predicate).unwrap();
373436

374-
let expected = "((year <= 53) AND (month <= 647)) AND (day <= 19722)".to_string();
437+
let expected = "day <= 19722".to_string();
375438

376439
assert_eq!(result.to_string(), expected);
377440
}
378441

379442
#[test]
380443
fn test_inclusive_projection_truncate_transform() {
381444
let schema = build_test_schema();
445+
let arc_schema = Arc::new(schema);
382446

383-
let partition_spec = PartitionSpec::builder(&schema)
447+
let partition_spec = PartitionSpec::builder(arc_schema.clone())
384448
.with_spec_id(1)
385449
.add_unbound_field(
386450
UnboundPartitionField::builder()
@@ -392,9 +456,9 @@ mod tests {
392456
)
393457
.unwrap()
394458
.build()
395-
.unwrap();
459+
.unwrap()
460+
.into_schemaless();
396461

397-
let arc_schema = Arc::new(schema);
398462
let arc_partition_spec = Arc::new(partition_spec);
399463

400464
let unbound_predicate = Reference::new("name").starts_with(Datum::string("Testy McTest"));
@@ -408,7 +472,7 @@ mod tests {
408472
// name that start with "Testy McTest" into a partition
409473
// for values of name that start with the first four letters
410474
// of that, ie "Test".
411-
let mut inclusive_projection = InclusiveProjection::new(arc_partition_spec.clone());
475+
let mut inclusive_projection = InclusiveProjection::new(arc_partition_spec);
412476
let result = inclusive_projection.project(&bound_predicate).unwrap();
413477

414478
let expected = "name_truncate STARTS WITH \"Test\"".to_string();
@@ -419,8 +483,9 @@ mod tests {
419483
#[test]
420484
fn test_inclusive_projection_bucket_transform() {
421485
let schema = build_test_schema();
486+
let arc_schema = Arc::new(schema);
422487

423-
let partition_spec = PartitionSpec::builder(&schema)
488+
let partition_spec = PartitionSpec::builder(arc_schema.clone())
424489
.with_spec_id(1)
425490
.add_unbound_field(
426491
UnboundPartitionField::builder()
@@ -432,9 +497,9 @@ mod tests {
432497
)
433498
.unwrap()
434499
.build()
435-
.unwrap();
500+
.unwrap()
501+
.into_schemaless();
436502

437-
let arc_schema = Arc::new(schema);
438503
let arc_partition_spec = Arc::new(partition_spec);
439504

440505
let unbound_predicate = Reference::new("a").equal_to(Datum::int(10));
@@ -445,7 +510,7 @@ mod tests {
445510
// should result in the "a = 10" predicate being
446511
// transformed into "a = 2", since 10 gets bucketed
447512
// to 2 with a Bucket(7) partition
448-
let mut inclusive_projection = InclusiveProjection::new(arc_partition_spec.clone());
513+
let mut inclusive_projection = InclusiveProjection::new(arc_partition_spec);
449514
let result = inclusive_projection.project(&bound_predicate).unwrap();
450515

451516
let expected = "a_bucket[7] = 2".to_string();

crates/iceberg/src/io/object_cache.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,7 @@ mod tests {
262262
)
263263
.write(Manifest::new(
264264
ManifestMetadata::builder()
265-
.schema((*current_schema).clone())
265+
.schema(current_schema.clone())
266266
.content(ManifestContentType::Data)
267267
.format_version(FormatVersion::V2)
268268
.partition_spec((**current_partition_spec).clone())

crates/iceberg/src/scan.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -688,7 +688,7 @@ impl PartitionFilterCache {
688688
&self,
689689
spec_id: i32,
690690
table_metadata: &TableMetadataRef,
691-
schema: &SchemaRef,
691+
schema: &Schema,
692692
case_sensitive: bool,
693693
filter: BoundPredicate,
694694
) -> Result<Arc<BoundPredicate>> {
@@ -714,11 +714,11 @@ impl PartitionFilterCache {
714714
format!("Could not find partition spec for id {}", spec_id),
715715
))?;
716716

717-
let partition_type = partition_spec.partition_type(schema.as_ref())?;
717+
let partition_type = partition_spec.partition_type(schema)?;
718718
let partition_fields = partition_type.fields().to_owned();
719719
let partition_schema = Arc::new(
720720
Schema::builder()
721-
.with_schema_id(partition_spec.spec_id)
721+
.with_schema_id(partition_spec.spec_id())
722722
.with_fields(partition_fields)
723723
.build()?,
724724
);
@@ -1012,7 +1012,7 @@ mod tests {
10121012
)
10131013
.write(Manifest::new(
10141014
ManifestMetadata::builder()
1015-
.schema((*current_schema).clone())
1015+
.schema(current_schema.clone())
10161016
.content(ManifestContentType::Data)
10171017
.format_version(FormatVersion::V2)
10181018
.partition_spec((**current_partition_spec).clone())

0 commit comments

Comments
 (0)