Skip to content

Lazily set the producer head at execution time#478

Merged
gabotechs merged 2 commits into
mainfrom
gabrielmusat/producer-head
Jun 11, 2026
Merged

Lazily set the producer head at execution time#478
gabotechs merged 2 commits into
mainfrom
gabrielmusat/producer-head

Conversation

@gabotechs

@gabotechs gabotechs commented Jun 1, 2026

Copy link
Copy Markdown
Collaborator

This is one PR from the following stack of PRs:


Introduces the ProducerHead type:

pub enum ProducerHead {
    /// No specific head node is necessary.
    None,
    /// The head node should be a [BroadcastExec].
    BroadcastExec { output_partitions: usize },
    /// The head node should be a [RepartitionExec].
    RepartitionExec { partitioning: Partitioning },
}

Which is passed over the network while remotely executing tasks in order to set the appropriate node at the head of a stage.

Today, this is a noop because the right head node in stages is ensured statically at planning time, but in follow up PRs, network boundaries can get swamped and reorganized arbitrarily.

One example that happens in AQE:

  1. A JOIN is planned as a CollectLeft
HashJoinExec: mode=CollectLeft
  CoalescePartitionsExec:
    [Stage 1] => NetworkBroadcastExec
      BroadcastExec
        DistributedLeafExec: unknown size
  DistributedLeafExec: unknown size
  1. While collecting runtime statistics, it happens that Stage 1 is huge, and during AQE the JOINs are swapped
HashJoinExec: mode=CollectLeft
  DistributedLeafExec: small size
  CoalescePartitionsExec:
    [Stage 1] => NetworkBroadcastExec
      BroadcastExec
        DistributedLeafExec: big size
  1. The Stage 1 is now on the probe side, so it needs to be rewritten to a NetworkShuffleExec, otherwise duplicate data will be returned:
HashJoinExec: mode=CollectLeft
  DistributedLeafExec: small size
  CoalescePartitionsExec:
    [Stage 1] => NetworkShuffleExec
      RepartitionExec // <- dynamically swapped at runtime based on the passed `ProducerHead`
        DistributedLeafExec: big size

Passing a ProducerHead at execution time unlocks two things:

  1. dynamically set the fanout width accounting for a dynamically scaled upper stage
  2. dynamically set the correct operator BroadcastExec or RepartitionExec based on the network boundary above, which might have changed because of AQE

@gabotechs gabotechs changed the base branch from main to gabrielmusat/max-gauge June 1, 2026 12:58
@gabotechs gabotechs force-pushed the gabrielmusat/max-gauge branch from 8e539eb to c1922d9 Compare June 1, 2026 13:01
@gabotechs gabotechs mentioned this pull request Jun 1, 2026
@gabotechs gabotechs marked this pull request as ready for review June 1, 2026 13:17
@gabotechs gabotechs force-pushed the gabrielmusat/max-gauge branch from c1922d9 to b505fdb Compare June 2, 2026 13:30
@gabotechs gabotechs force-pushed the gabrielmusat/producer-head branch from 520fe2c to 82af353 Compare June 2, 2026 13:30
gabotechs added a commit that referenced this pull request Jun 2, 2026
This is one PR from the following stack of PRs:
- #477
<- you are here
- #463
- #464
- #478
- #479
- #432

---

Network boundaries in this project are currently breaking one assumption
from upstream DataFusion:

`SendableRecordBatchStream`s yield record batches in two situations:
- If explicitly polled
- Eagerly on an spawned task triggered by the first poll


https://github.com/apache/datafusion/blob/d9ea38b95123159161c017840d3e6256e41988dd/datafusion/physical-plan/src/execution_plan.rs#L973-L988

Today, network boundaries pulling from remote sources are breaking this
rule, because they start yielding `RecordBatches` over the network even
if no poll has ever happened to the `SendableRecordBatchStream` returned
by the network boundary.

This has two consequences:
1. Greater memory consumption, as data will get accumulated in the
network boundaries while nobody is polling for it.
2. Greater speed on JOINs, as an artifact of eagerly buffering right
sides even before they are ever polled

Consequence 2 is nice, but it should be delivered using standard
upstream mechanisms:


https://github.com/apache/datafusion/blob/d9ea38b95123159161c017840d3e6256e41988dd/datafusion/common/src/config.rs#L695-L709

Not accidentally by how remote network boundaries work.

---

This PR makes it so that remote network boundaries only start the
network stream on first poll, instead of on `.execute()` call, as stated
by the `EvaluationType::Eager` docs.
@gabotechs gabotechs force-pushed the gabrielmusat/max-gauge branch from b505fdb to b1b8dcb Compare June 2, 2026 15:06
@gabotechs gabotechs force-pushed the gabrielmusat/producer-head branch from 82af353 to b2f8e5e Compare June 2, 2026 15:06
@gabotechs gabotechs force-pushed the gabrielmusat/max-gauge branch from b1b8dcb to 440b3f5 Compare June 2, 2026 15:15
@gabotechs gabotechs force-pushed the gabrielmusat/producer-head branch from b2f8e5e to ca79034 Compare June 2, 2026 15:15
@gabotechs gabotechs force-pushed the gabrielmusat/max-gauge branch from 440b3f5 to 1968bb7 Compare June 8, 2026 10:48
@gabotechs gabotechs force-pushed the gabrielmusat/producer-head branch from ca79034 to 431d5a2 Compare June 8, 2026 10:48
@gabotechs gabotechs mentioned this pull request Jun 8, 2026
@gabotechs gabotechs force-pushed the gabrielmusat/max-gauge branch from 1968bb7 to d33afcc Compare June 8, 2026 12:19
@gabotechs gabotechs force-pushed the gabrielmusat/producer-head branch from 431d5a2 to fe92488 Compare June 8, 2026 12:19
@gabotechs gabotechs force-pushed the gabrielmusat/max-gauge branch 2 times, most recently from 4496e3f to ed657d2 Compare June 8, 2026 14:45
@gabotechs gabotechs force-pushed the gabrielmusat/producer-head branch from fe92488 to 7b7b571 Compare June 8, 2026 14:51
@gabotechs gabotechs force-pushed the gabrielmusat/max-gauge branch from ed657d2 to ce7fb75 Compare June 8, 2026 14:58
@gabotechs gabotechs force-pushed the gabrielmusat/producer-head branch from 7b7b571 to 423e39a Compare June 8, 2026 15:11
@gabotechs gabotechs force-pushed the gabrielmusat/max-gauge branch from ce7fb75 to 441badb Compare June 9, 2026 06:42
@gabotechs gabotechs force-pushed the gabrielmusat/producer-head branch from 423e39a to 8ab2da5 Compare June 9, 2026 06:42
}

Ok(input)
let input = if let Some(r_exec) = input.downcast_ref::<RepartitionExec>() {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not move this into the match below and error if something unexpected happens? Ex. if you match on ProducerHead::BroadcastExec and input.downcast_ref::<BroadcastExec>() is None, then error?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's not really anything unexpected that can happen here. Actually, the only reason for this function to return a Result<> is because upstream RepartitionExec::try_new() signature requires it.

If the passed ProducerHead is a ProducerHead::BroadcastExec, but the input node is a RepartitionExec, this is fine and expected, it just means that we need to swap the RepartitionExec by a BroadcastExec.

This happens typically while re-ordering JOIN operations on the fly, maybe we swap what was a shuffle join by a broadcast join, and we need to swap the producer head at runtime.

Of course, that example is not possible in this PR, but it will start happening relatively soon.

}

/// Defines what shape should the head node of a stage have upon getting executed. Depending
/// on the [NetworkBoundary] implementation, the stage below should have different head nodes.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mega nit: in upstream they add backticks like [NetworkBoundary]

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I've been noticing for a while. I think this has to do in how things render in docs.rs. We should probably actually:

  1. Render this on docs.rs
  2. If we think it's indeed weird, do a PR fully dedicated to adding backticks in all docs at once

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created this issue for tracking:

#495


/// Defines what shape should the head node of a stage have upon getting executed. Depending
/// on the [NetworkBoundary] implementation, the stage below should have different head nodes.
pub enum ProducerHead {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe NetworkBoundaryProducer is a better name. I don't feel that strongly though.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the fact that the "Head" keyword is in there, as it states that we are not referring to a full plan, and we are referring to just the head of it.

properties: Arc::new(PlanProperties::new(
EquivalenceProperties::new(Arc::clone(&self.schema)),
Partitioning::UnknownPartitioning(self.bench.partitions),
Partitioning::RoundRobinBatch(self.bench.partitions),

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't the ProducerHead of a NetworkShuffleExec always a RepartitionExec with Hash partitioning? So the output partitioning of the NetworkShuffleExec is Hash as well.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Today, that happens always yes, but there are legitimate cases for this to be a RoundRobin shuffle: whenever a JOIN order in a CollectLeft join is swapped:

  • The left side goes to the right, moving from a broadcast to a round robin shuffle (as we don't have a set of expressions to hash)
  • The right side goes to the left, and whatever it was, it gets swapped to being a broadcast

I needed to change this line here today just because otherwise, this benchmark will try to set a ProducerHead in the producing stage to RepartitionExec(UnknownPartitioning), which fails to build in DataFusion. Instead, a RoundRobin is passed so that we can build that RepartitionExec successfully, which does not really matter what it is here, as this is just a benchmark for the transport part.

gabotechs added a commit that referenced this pull request Jun 11, 2026
This is one PR from the following stack of PRs:
- #477
- #463
<- you are here
- #464
- #478
- #479
- #486
- #432

This PR introduces a NetworkBoundaryBuilder argument to the network
boundary injection logic, allowing more flexible and configurable
strategies for determining which exchanges require network
communication. This enables better optimization of data movement across
distributed tasks.
gabotechs added a commit that referenced this pull request Jun 11, 2026
This is one PR from the following stack of PRs:

- #477
- #463
- #464
<- you are here
- #478
- #479
- #486
- #432

This PR introduces a MaxGauge metric to provide better tracking of peak
values in distributed metrics collection. This enables more accurate
monitoring of resource utilization and helps identify bottlenecks in the
execution pipeline.
Base automatically changed from gabrielmusat/max-gauge to main June 11, 2026 07:10
@gabotechs gabotechs force-pushed the gabrielmusat/producer-head branch from 8ab2da5 to 657a531 Compare June 11, 2026 07:11
@gabotechs gabotechs merged commit 2f75468 into main Jun 11, 2026
17 checks passed
@gabotechs gabotechs deleted the gabrielmusat/producer-head branch June 11, 2026 07:59
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants