Skip to content

Commit 859f945

Browse files
committed
feat: add PercentileTDigest support for MergeAndRollup aggregation
Add PercentileTDigestAggregator for minion merge/rollup tasks, enabling TDigest sketch merging with configurable compression factor. - New PercentileTDigestAggregator implementing ValueAggregator - Register PERCENTILETDIGEST and PERCENTILERAWTDIGEST in ValueAggregatorFactory - Add both types to AVAILABLE_CORE_VALUE_AGGREGATORS in MinionConstants - Allow compressionFactor in MergeRollupTaskGenerator validation - Unit tests for the aggregator with default and custom compression - Integration test exercising the full MergeRollupTaskExecutor pipeline with TDigest BYTES columns, multiple dimension groups, cross-segment merging, skewed distributions, and edge cases
1 parent 736cbf7 commit 859f945

File tree

7 files changed

+728
-4
lines changed

7 files changed

+728
-4
lines changed

pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ public static class RealtimeToOfflineSegmentsTask extends MergeTask {
189189
DISTINCTCOUNTRAWTHETASKETCH, DISTINCTCOUNTTUPLESKETCH, DISTINCTCOUNTRAWINTEGERSUMTUPLESKETCH,
190190
SUMVALUESINTEGERSUMTUPLESKETCH, AVGVALUEINTEGERSUMTUPLESKETCH, DISTINCTCOUNTHLLPLUS,
191191
DISTINCTCOUNTRAWHLLPLUS, DISTINCTCOUNTCPCSKETCH, DISTINCTCOUNTRAWCPCSKETCH, DISTINCTCOUNTULL,
192-
DISTINCTCOUNTRAWULL, PERCENTILEKLL, PERCENTILERAWKLL);
192+
DISTINCTCOUNTRAWULL, PERCENTILEKLL, PERCENTILERAWKLL, PERCENTILETDIGEST, PERCENTILERAWTDIGEST);
193193
}
194194

195195
// Generate segment and push to controller based on batch ingestion configs
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pinot.core.segment.processing.aggregator;
20+
21+
import com.tdunning.math.stats.TDigest;
22+
import java.util.Map;
23+
import org.apache.pinot.core.common.ObjectSerDeUtils;
24+
import org.apache.pinot.core.query.aggregation.function.PercentileTDigestAggregationFunction;
25+
import org.apache.pinot.segment.spi.Constants;
26+
27+
28+
public class PercentileTDigestAggregator implements ValueAggregator {
29+
30+
@Override
31+
public Object aggregate(Object value1, Object value2, Map<String, String> functionParameters) {
32+
String compressionParam = functionParameters.get(Constants.PERCENTILETDIGEST_COMPRESSION_FACTOR_KEY);
33+
34+
int compression;
35+
if (compressionParam != null) {
36+
compression = Integer.parseInt(compressionParam);
37+
} else {
38+
compression = PercentileTDigestAggregationFunction.DEFAULT_TDIGEST_COMPRESSION;
39+
}
40+
41+
TDigest first = ObjectSerDeUtils.TDIGEST_SER_DE.deserialize((byte[]) value1);
42+
TDigest second = ObjectSerDeUtils.TDIGEST_SER_DE.deserialize((byte[]) value2);
43+
TDigest merged = TDigest.createMergingDigest(compression);
44+
merged.add(first);
45+
merged.add(second);
46+
return ObjectSerDeUtils.TDIGEST_SER_DE.serialize(merged);
47+
}
48+
}

pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/ValueAggregatorFactory.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,9 @@ public static ValueAggregator getValueAggregator(AggregationFunctionType aggrega
6464
case PERCENTILEKLL:
6565
case PERCENTILERAWKLL:
6666
return new PercentileKLLSketchAggregator();
67+
case PERCENTILETDIGEST:
68+
case PERCENTILERAWTDIGEST:
69+
return new PercentileTDigestAggregator();
6770
default:
6871
throw new IllegalStateException("Unsupported aggregation type: " + aggregationType);
6972
}
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pinot.core.segment.processing.aggregator;
20+
21+
import com.tdunning.math.stats.TDigest;
22+
import java.util.HashMap;
23+
import java.util.Map;
24+
import org.apache.pinot.core.common.ObjectSerDeUtils;
25+
import org.apache.pinot.segment.spi.Constants;
26+
import org.testng.annotations.BeforeMethod;
27+
import org.testng.annotations.Test;
28+
29+
import static org.testng.Assert.assertEquals;
30+
import static org.testng.Assert.assertNotNull;
31+
32+
33+
public class PercentileTDigestAggregatorTest {
34+
35+
private PercentileTDigestAggregator _aggregator;
36+
37+
@BeforeMethod
38+
public void setUp() {
39+
_aggregator = new PercentileTDigestAggregator();
40+
}
41+
42+
@Test
43+
public void testAggregateWithDefaultCompression() {
44+
TDigest first = TDigest.createMergingDigest(100);
45+
for (int i = 0; i < 100; i++) {
46+
first.add(i);
47+
}
48+
TDigest second = TDigest.createMergingDigest(100);
49+
for (int i = 100; i < 200; i++) {
50+
second.add(i);
51+
}
52+
53+
byte[] value1 = ObjectSerDeUtils.TDIGEST_SER_DE.serialize(first);
54+
byte[] value2 = ObjectSerDeUtils.TDIGEST_SER_DE.serialize(second);
55+
56+
Map<String, String> functionParameters = new HashMap<>();
57+
byte[] result = (byte[]) _aggregator.aggregate(value1, value2, functionParameters);
58+
59+
TDigest resultDigest = ObjectSerDeUtils.TDIGEST_SER_DE.deserialize(result);
60+
assertNotNull(resultDigest);
61+
assertEquals(resultDigest.size(), 200);
62+
assertEquals(resultDigest.quantile(0.5), 99.5, 1.0);
63+
}
64+
65+
@Test
66+
public void testAggregateWithCustomCompression() {
67+
TDigest first = TDigest.createMergingDigest(100);
68+
for (int i = 0; i < 50; i++) {
69+
first.add(i);
70+
}
71+
TDigest second = TDigest.createMergingDigest(100);
72+
for (int i = 50; i < 100; i++) {
73+
second.add(i);
74+
}
75+
76+
byte[] value1 = ObjectSerDeUtils.TDIGEST_SER_DE.serialize(first);
77+
byte[] value2 = ObjectSerDeUtils.TDIGEST_SER_DE.serialize(second);
78+
79+
Map<String, String> functionParameters = new HashMap<>();
80+
functionParameters.put(Constants.PERCENTILETDIGEST_COMPRESSION_FACTOR_KEY, "200");
81+
82+
byte[] result = (byte[]) _aggregator.aggregate(value1, value2, functionParameters);
83+
84+
TDigest resultDigest = ObjectSerDeUtils.TDIGEST_SER_DE.deserialize(result);
85+
assertNotNull(resultDigest);
86+
assertEquals(resultDigest.size(), 100);
87+
assertEquals(resultDigest.quantile(0.5), 49.5, 1.0);
88+
}
89+
}

pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -504,7 +504,8 @@ public void validateTaskConfigs(TableConfig tableConfig, Schema schema, Map<Stri
504504
// check no mis-configured aggregation function parameters
505505
Set<String> allowedFunctionParameterNames = ImmutableSet.of(Constants.CPCSKETCH_LGK_KEY.toLowerCase(),
506506
Constants.THETA_TUPLE_SKETCH_SAMPLING_PROBABILITY.toLowerCase(),
507-
Constants.THETA_TUPLE_SKETCH_NOMINAL_ENTRIES.toLowerCase());
507+
Constants.THETA_TUPLE_SKETCH_NOMINAL_ENTRIES.toLowerCase(),
508+
Constants.PERCENTILETDIGEST_COMPRESSION_FACTOR_KEY.toLowerCase());
508509
Map<String, Map<String, String>> aggregationFunctionParameters =
509510
MergeRollupTaskUtils.getAggregationFunctionParameters(taskConfigs);
510511
for (String fieldName : aggregationFunctionParameters.keySet()) {
@@ -515,10 +516,12 @@ public void validateTaskConfigs(TableConfig tableConfig, Schema schema, Map<Stri
515516
for (String functionParameterName : functionParameters.keySet()) {
516517
// check that function parameter name is valid
517518
Preconditions.checkState(allowedFunctionParameterNames.contains(functionParameterName.toLowerCase()),
518-
"Aggregation function parameter name must be one of [lgK, samplingProbability, nominalEntries]!");
519+
"Aggregation function parameter name must be one of [lgK, samplingProbability, nominalEntries,"
520+
+ " compressionFactor]!");
519521
// check that function parameter value is valid for nominal entries
520522
if (functionParameterName.equalsIgnoreCase(Constants.CPCSKETCH_LGK_KEY)
521-
|| functionParameterName.equalsIgnoreCase(Constants.THETA_TUPLE_SKETCH_NOMINAL_ENTRIES)) {
523+
|| functionParameterName.equalsIgnoreCase(Constants.THETA_TUPLE_SKETCH_NOMINAL_ENTRIES)
524+
|| functionParameterName.equalsIgnoreCase(Constants.PERCENTILETDIGEST_COMPRESSION_FACTOR_KEY)) {
522525
String value = functionParameters.get(functionParameterName);
523526
String err = "Aggregation function parameter \"" + functionParameterName + "\" on column \"" + fieldName
524527
+ "\" has invalid value: " + value;

0 commit comments

Comments
 (0)