Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.pinot.core.operator.blocks.results.ExceptionResultsBlock;
import org.apache.pinot.core.operator.combine.merger.ResultsBlockMerger;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.spi.accounting.ThreadResourceSnapshot;
import org.apache.pinot.spi.exception.QueryErrorCode;
import org.apache.pinot.spi.exception.QueryErrorMessage;
import org.apache.pinot.spi.exception.QueryException;
Expand Down Expand Up @@ -57,8 +58,14 @@ protected BaseSingleBlockCombineOperator(ResultsBlockMerger<T> resultsBlockMerge
/// @inheritDoc
///
/// Handles exceptions here so that execution stats can be attached.
/// When only a single task is needed and the subclass uses the default ResultsBlockMerger (not null), segments are
/// processed directly on the calling thread to avoid the overhead of thread submission, Phaser synchronization,
/// BlockingQueue polling, and atomic operations.
@Override
protected BaseResultsBlock getNextBlock() {
if (_numTasks == 1 && _resultsBlockMerger != null) {
return getNextBlockSingleThread();
}
try {
startProcess();
return checkTerminateExceptionAndAttachExecutionStats(mergeResults());
Expand All @@ -69,6 +76,57 @@ protected BaseResultsBlock getNextBlock() {
}
}

/// Processes all segments sequentially on the calling thread when only one task is needed.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing timeout protection. This fast path should respect _queryContext.getEndTimeMs() like the original mergeResults() method does. A hanging segment operator could block indefinitely here without timeout checking.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. Added a System.currentTimeMillis() >= endTimeMs check at the top of each loop iteration before invoking operator.nextBlock(). If the deadline is exceeded, getTimeoutResultsBlock(i) is returned
immediately — exactly mirroring the waitTimeMs <= 0 guard in mergeResults(). A stalled segment operator will now be bypassed at the next iteration boundary rather than blocking indefinitely.

/// Avoids all concurrency overhead: no ExecutorService submission, no Phaser, no BlockingQueue, no atomics.
/// Respects the query deadline: if the timeout is exceeded before a segment operator is invoked, a timeout
/// results block is returned immediately rather than blocking indefinitely on a stalled operator.
@SuppressWarnings("unchecked")
private BaseResultsBlock getNextBlockSingleThread() {
ThreadResourceSnapshot resourceSnapshot = new ThreadResourceSnapshot();
T mergedBlock = null;
long endTimeMs = _queryContext.getEndTimeMs();
try {
for (int i = 0; i < _numOperators; i++) {
// Check timeout before invoking each segment operator so we respect the query deadline
// even if a segment operator blocks for a long time (mirrors mergeResults() timeout logic).
if (System.currentTimeMillis() >= endTimeMs) {
return attachExecutionStats(getTimeoutResultsBlock(i));
}
Operator operator = _operators.get(i);
T resultsBlock;
try {
if (operator instanceof AcquireReleaseColumnsSegmentOperator) {
((AcquireReleaseColumnsSegmentOperator) operator).acquire();
}
resultsBlock = (T) operator.nextBlock();
} catch (RuntimeException e) {
return createExceptionResultsBlockAndAttachExecutionStats(wrapOperatorException(operator, e),
"processing segments");
} finally {
if (operator instanceof AcquireReleaseColumnsSegmentOperator) {
((AcquireReleaseColumnsSegmentOperator) operator).release();
}
}
if (resultsBlock.getErrorMessages() != null) {
// Propagate segment-level error immediately
return attachExecutionStats(resultsBlock);
}
if (mergedBlock == null) {
mergedBlock = resultsBlock;
} else {
_resultsBlockMerger.mergeResultsBlocks(mergedBlock, resultsBlock);
}
if (_resultsBlockMerger.isQuerySatisfied(mergedBlock)) {
break;
}
}
} finally {
_totalWorkerThreadCpuTimeNs.addAndGet(resourceSnapshot.getCpuTimeNs());
_totalWorkerThreadMemAllocatedBytes.addAndGet(resourceSnapshot.getAllocatedBytes());
}
return checkTerminateExceptionAndAttachExecutionStats(mergedBlock);
}

@Override
protected void processSegments() {
int operatorId;
Expand Down
Loading