Skip to content

Commit 2bf5956

Browse files
lintingbinczy006
andauthored
[AMORO-3424] Add the self-optimizing.partition-filter parameter (#3426)
* feature: add mvp code. * feature: use jsqlparser replace spark-sql * feature: degrade jsqlparser version to 4.5 * change jsqlparser 4.5 to 4.7 * add optimizing evaluator test cases. * optimize code. * add test case for "". --------- Co-authored-by: ConradJam <[email protected]>
1 parent a9f03ed commit 2bf5956

File tree

11 files changed

+462
-20
lines changed

11 files changed

+462
-20
lines changed

amoro-ams/src/main/java/org/apache/amoro/server/table/TableConfigurations.java

+5
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,11 @@ public static OptimizingConfig parseOptimizingConfig(Map<String, String> propert
294294
properties,
295295
TableProperties.SELF_OPTIMIZING_FULL_REWRITE_ALL_FILES,
296296
TableProperties.SELF_OPTIMIZING_FULL_REWRITE_ALL_FILES_DEFAULT))
297+
.setFilter(
298+
CompatiblePropertyUtil.propertyAsString(
299+
properties,
300+
TableProperties.SELF_OPTIMIZING_FILTER,
301+
TableProperties.SELF_OPTIMIZING_FILTER_DEFAULT))
297302
.setBaseHashBucket(
298303
CompatiblePropertyUtil.propertyAsInt(
299304
properties,

amoro-ams/src/test/java/org/apache/amoro/server/optimizing/plan/TestOptimizingEvaluator.java

+78
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
3434
import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
3535
import org.apache.amoro.shade.guava32.com.google.common.collect.Sets;
36+
import org.apache.amoro.table.TableProperties;
3637
import org.apache.amoro.table.TableSnapshot;
3738
import org.apache.iceberg.DataFile;
3839
import org.apache.iceberg.StructLike;
@@ -121,6 +122,83 @@ public void testFragmentFiles() {
121122
assertInput(pendingInput, FileInfo.buildFileInfo(dataFiles));
122123
}
123124

125+
@Test
126+
public void testFragmentFilesWithPartitionFilterTimeStamp() {
127+
getMixedTable()
128+
.updateProperties()
129+
.set(TableProperties.SELF_OPTIMIZING_FILTER, "op_time >= '2022-01-01T12:00:00'")
130+
.commit();
131+
testFragmentFilesWithPartitionFilterDo(true);
132+
133+
getMixedTable()
134+
.updateProperties()
135+
.set(TableProperties.SELF_OPTIMIZING_FILTER, "op_time > '2022-01-01T12:00:00'")
136+
.commit();
137+
testFragmentFilesWithPartitionFilterDo(false);
138+
}
139+
140+
@Test
141+
public void testFragmentFilesWithPartitionFilterInteger() {
142+
getMixedTable()
143+
.updateProperties()
144+
.set(TableProperties.SELF_OPTIMIZING_FILTER, "id > 0")
145+
.commit();
146+
testFragmentFilesWithPartitionFilterDo(true);
147+
148+
getMixedTable()
149+
.updateProperties()
150+
.set(TableProperties.SELF_OPTIMIZING_FILTER, "id > 8")
151+
.commit();
152+
testFragmentFilesWithPartitionFilterDo(false);
153+
}
154+
155+
@Test
156+
public void testFragmentFilesWithPartitionFilterString() {
157+
getMixedTable()
158+
.updateProperties()
159+
.set(TableProperties.SELF_OPTIMIZING_FILTER, "name > '0'")
160+
.commit();
161+
testFragmentFilesWithPartitionFilterDo(true);
162+
163+
getMixedTable()
164+
.updateProperties()
165+
.set(TableProperties.SELF_OPTIMIZING_FILTER, "name > '8'")
166+
.commit();
167+
testFragmentFilesWithPartitionFilterDo(false);
168+
}
169+
170+
private void testFragmentFilesWithPartitionFilterDo(boolean isNecessary) {
171+
closeFullOptimizingInterval();
172+
updateBaseHashBucket(1);
173+
List<DataFile> dataFiles = Lists.newArrayList();
174+
List<Record> newRecords =
175+
OptimizingTestHelpers.generateRecord(tableTestHelper(), 1, 4, "2022-01-01T12:00:00");
176+
long transactionId = beginTransaction();
177+
dataFiles.addAll(
178+
OptimizingTestHelpers.appendBase(
179+
getMixedTable(),
180+
tableTestHelper().writeBaseStore(getMixedTable(), transactionId, newRecords, false)));
181+
182+
// add more files
183+
newRecords =
184+
OptimizingTestHelpers.generateRecord(tableTestHelper(), 5, 8, "2022-01-01T12:00:00");
185+
transactionId = beginTransaction();
186+
dataFiles.addAll(
187+
OptimizingTestHelpers.appendBase(
188+
getMixedTable(),
189+
tableTestHelper().writeBaseStore(getMixedTable(), transactionId, newRecords, false)));
190+
191+
AbstractOptimizingEvaluator optimizingEvaluator = buildOptimizingEvaluator();
192+
if (isNecessary) {
193+
Assert.assertTrue(optimizingEvaluator.isNecessary());
194+
AbstractOptimizingEvaluator.PendingInput pendingInput =
195+
optimizingEvaluator.getOptimizingPendingInput();
196+
assertInput(pendingInput, FileInfo.buildFileInfo(dataFiles));
197+
} else {
198+
Assert.assertFalse(optimizingEvaluator.isNecessary());
199+
}
200+
}
201+
124202
protected AbstractOptimizingEvaluator buildOptimizingEvaluator() {
125203
TableSnapshot snapshot = IcebergTableUtil.getSnapshot(getMixedTable(), tableRuntime);
126204
return IcebergTableUtil.createOptimizingEvaluator(tableRuntime, getMixedTable(), snapshot, 100);

amoro-common/src/main/java/org/apache/amoro/config/OptimizingConfig.java

+15
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,9 @@ public class OptimizingConfig {
7474
// self-optimizing.full.rewrite-all-files
7575
private boolean fullRewriteAllFiles;
7676

77+
// self-optimizing.filter
78+
private String filter;
79+
7780
// base.file-index.hash-bucket
7881
private int baseHashBucket;
7982

@@ -240,6 +243,15 @@ public OptimizingConfig setFullRewriteAllFiles(boolean fullRewriteAllFiles) {
240243
return this;
241244
}
242245

246+
public OptimizingConfig setFilter(String filter) {
247+
this.filter = filter;
248+
return this;
249+
}
250+
251+
public String getFilter() {
252+
return filter;
253+
}
254+
243255
public int getBaseHashBucket() {
244256
return baseHashBucket;
245257
}
@@ -291,6 +303,7 @@ public boolean equals(Object o) {
291303
&& Double.compare(that.majorDuplicateRatio, majorDuplicateRatio) == 0
292304
&& fullTriggerInterval == that.fullTriggerInterval
293305
&& fullRewriteAllFiles == that.fullRewriteAllFiles
306+
&& Objects.equal(filter, that.filter)
294307
&& baseHashBucket == that.baseHashBucket
295308
&& baseRefreshInterval == that.baseRefreshInterval
296309
&& hiveRefreshInterval == that.hiveRefreshInterval
@@ -317,6 +330,7 @@ public int hashCode() {
317330
majorDuplicateRatio,
318331
fullTriggerInterval,
319332
fullRewriteAllFiles,
333+
filter,
320334
baseHashBucket,
321335
baseRefreshInterval,
322336
hiveRefreshInterval,
@@ -341,6 +355,7 @@ public String toString() {
341355
.add("majorDuplicateRatio", majorDuplicateRatio)
342356
.add("fullTriggerInterval", fullTriggerInterval)
343357
.add("fullRewriteAllFiles", fullRewriteAllFiles)
358+
.add("filter", filter)
344359
.add("baseHashBucket", baseHashBucket)
345360
.add("baseRefreshInterval", baseRefreshInterval)
346361
.add("hiveRefreshInterval", hiveRefreshInterval)

amoro-format-iceberg/pom.xml

+5
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,11 @@
154154
<artifactId>paimon-bundle</artifactId>
155155
</dependency>
156156

157+
<dependency>
158+
<groupId>com.github.jsqlparser</groupId>
159+
<artifactId>jsqlparser</artifactId>
160+
</dependency>
161+
157162
<dependency>
158163
<groupId>org.roaringbitmap</groupId>
159164
<artifactId>RoaringBitmap</artifactId>

amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/AbstractOptimizingEvaluator.java

+4-3
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,12 @@
3232
import org.apache.amoro.table.KeyedTableSnapshot;
3333
import org.apache.amoro.table.MixedTable;
3434
import org.apache.amoro.table.TableSnapshot;
35+
import org.apache.amoro.utils.ExpressionUtil;
3536
import org.apache.iceberg.PartitionSpec;
3637
import org.apache.iceberg.Snapshot;
3738
import org.apache.iceberg.SnapshotSummary;
3839
import org.apache.iceberg.StructLike;
3940
import org.apache.iceberg.expressions.Expression;
40-
import org.apache.iceberg.expressions.Expressions;
4141
import org.apache.iceberg.io.CloseableIterable;
4242
import org.apache.iceberg.util.Pair;
4343
import org.apache.iceberg.util.PropertyUtil;
@@ -111,7 +111,8 @@ protected void initEvaluator() {
111111
}
112112

113113
protected Expression getPartitionFilter() {
114-
return Expressions.alwaysTrue();
114+
return ExpressionUtil.convertSqlFilterToIcebergExpression(
115+
config.getFilter(), mixedTable.schema().columns());
115116
}
116117

117118
private void initPartitionPlans(TableFileScanHelper tableFileScanHelper) {
@@ -142,7 +143,7 @@ private void initPartitionPlans(TableFileScanHelper tableFileScanHelper) {
142143
partitionPlanMap.entrySet().stream()
143144
.filter(entry -> entry.getValue().isNecessary())
144145
.limit(maxPendingPartitions)
145-
.collect(Collectors.toMap(entry -> entry.getKey(), entry -> entry.getValue())));
146+
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
146147
}
147148

148149
protected abstract PartitionEvaluator buildEvaluator(Pair<Integer, StructLike> partition);

amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/plan/AbstractOptimizingPlanner.java

+5
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.iceberg.PartitionSpec;
3232
import org.apache.iceberg.StructLike;
3333
import org.apache.iceberg.expressions.Expression;
34+
import org.apache.iceberg.expressions.Expressions;
3435
import org.apache.iceberg.util.Pair;
3536
import org.slf4j.Logger;
3637
import org.slf4j.LoggerFactory;
@@ -110,6 +111,10 @@ public Map<String, Long> getToSequence() {
110111

111112
@Override
112113
protected Expression getPartitionFilter() {
114+
if (Expressions.alwaysTrue().equals(partitionFilter)
115+
&& !Expressions.alwaysTrue().equals(super.getPartitionFilter())) {
116+
return super.getPartitionFilter();
117+
}
113118
return partitionFilter;
114119
}
115120

amoro-format-iceberg/src/main/java/org/apache/amoro/table/TableProperties.java

+3
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,9 @@ private TableProperties() {}
115115
"self-optimizing.full.rewrite-all-files";
116116
public static final boolean SELF_OPTIMIZING_FULL_REWRITE_ALL_FILES_DEFAULT = true;
117117

118+
public static final String SELF_OPTIMIZING_FILTER = "self-optimizing.filter";
119+
public static final String SELF_OPTIMIZING_FILTER_DEFAULT = null;
120+
118121
public static final String SELF_OPTIMIZING_MIN_PLAN_INTERVAL =
119122
"self-optimizing.min-plan-interval";
120123
public static final long SELF_OPTIMIZING_MIN_PLAN_INTERVAL_DEFAULT = 60000;

0 commit comments

Comments
 (0)