perf: skip concurrency overhead in BaseSingleBlockCombineOperator when numTasks=1#18146
perf: skip concurrency overhead in BaseSingleBlockCombineOperator when numTasks=1#18146deeppatel710 wants to merge 1 commit intoapache:masterfrom
Conversation
…n numTasks=1 When a query runs with a single execution task (e.g. one segment or maxExecutionThreads=1), BaseSingleBlockCombineOperator still incurred the full multi-thread overhead: ExecutorService.submit(), Phaser registration/ deregistration, BlockingQueue.poll() with timeout, AtomicInteger, and AtomicReference. This adds a single-thread fast path in getNextBlock(): when _numTasks==1 and _resultsBlockMerger is non-null (i.e. the subclass uses the default merge strategy), segments are processed sequentially on the calling thread with none of that synchronization overhead. CPU time and memory are still tracked via ThreadResourceSnapshot. Subclasses that override mergeResults() with custom logic (e.g. SequentialSortedGroupByCombineOperator, which passes null for _resultsBlockMerger) are unaffected and continue using the standard path. Fixes apache#14617 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
xiangfu0
left a comment
There was a problem hiding this comment.
I found a critical correctness issue with the fast path implementation:
Timeout Handling Missing: The new getNextBlockSingleThread() method lacks timeout protection that exists in the original mergeResults() method. The original method checks _queryContext.getEndTimeMs() and returns a timeout results block if the deadline is exceeded. The fast path has no such protection, which means:
- A hanging segment operator could block indefinitely
- No respect for query timeout deadline
- Potential resource exhaustion if a segment operator stalls
The original mergeResults() path handles this via the _blockingQueue.poll(waitTimeMs, TimeUnit.MILLISECONDS) with explicit timeout checking. The fast path should implement similar timeout protection.
| } | ||
| } | ||
|
|
||
| /// Processes all segments sequentially on the calling thread when only one task is needed. |
There was a problem hiding this comment.
Missing timeout protection. This fast path should respect _queryContext.getEndTimeMs() like the original mergeResults() method does. A hanging segment operator could block indefinitely here without timeout checking.
Summary
Fixes #14617
When a query runs with a single execution task (one segment, or
maxExecutionThreads=1),BaseSingleBlockCombineOperatorstill incurred the full multi-thread overhead:ExecutorService.submit()— thread pool task submissionPhaser— register/deregister synchronizationBlockingQueue.poll()— with timeout waitingAtomicInteger/AtomicReference— volatile reads/writes on the hot pathNone of this is necessary when
_numTasks == 1.Change
Added a
getNextBlockSingleThread()fast path inBaseSingleBlockCombineOperator.getNextBlock(): when_numTasks == 1and_resultsBlockMergeris non-null, all segments are processed sequentially on thecalling thread with no synchronization overhead. CPU time and memory allocation are still tracked via
ThreadResourceSnapshot.Subclasses that override
mergeResults()with custom logic (e.g.SequentialSortedGroupByCombineOperator, which passesnullfor_resultsBlockMerger) are unaffected and fall through to the standardmulti-thread path.
Test plan
CombineSlowOperatorsTest,CombineErrorOperatorsTest,SelectionCombineOperatorTest,SortedGroupByCombineOperatorsTest,CombinePlanNodeTest— all green