Lazily set the producer head at execution time#478
Conversation
8e539eb to
c1922d9
Compare
c1922d9 to
b505fdb
Compare
520fe2c to
82af353
Compare
This is one PR from the following stack of PRs: - #477 <- you are here - #463 - #464 - #478 - #479 - #432 --- Network boundaries in this project are currently breaking one assumption from upstream DataFusion: `SendableRecordBatchStream`s yield record batches in two situations: - If explicitly polled - Eagerly on an spawned task triggered by the first poll https://github.com/apache/datafusion/blob/d9ea38b95123159161c017840d3e6256e41988dd/datafusion/physical-plan/src/execution_plan.rs#L973-L988 Today, network boundaries pulling from remote sources are breaking this rule, because they start yielding `RecordBatches` over the network even if no poll has ever happened to the `SendableRecordBatchStream` returned by the network boundary. This has two consequences: 1. Greater memory consumption, as data will get accumulated in the network boundaries while nobody is polling for it. 2. Greater speed on JOINs, as an artifact of eagerly buffering right sides even before they are ever polled Consequence 2 is nice, but it should be delivered using standard upstream mechanisms: https://github.com/apache/datafusion/blob/d9ea38b95123159161c017840d3e6256e41988dd/datafusion/common/src/config.rs#L695-L709 Not accidentally by how remote network boundaries work. --- This PR makes it so that remote network boundaries only start the network stream on first poll, instead of on `.execute()` call, as stated by the `EvaluationType::Eager` docs.
b505fdb to
b1b8dcb
Compare
82af353 to
b2f8e5e
Compare
b1b8dcb to
440b3f5
Compare
b2f8e5e to
ca79034
Compare
440b3f5 to
1968bb7
Compare
ca79034 to
431d5a2
Compare
1968bb7 to
d33afcc
Compare
431d5a2 to
fe92488
Compare
4496e3f to
ed657d2
Compare
fe92488 to
7b7b571
Compare
ed657d2 to
ce7fb75
Compare
7b7b571 to
423e39a
Compare
ce7fb75 to
441badb
Compare
423e39a to
8ab2da5
Compare
| } | ||
|
|
||
| Ok(input) | ||
| let input = if let Some(r_exec) = input.downcast_ref::<RepartitionExec>() { |
There was a problem hiding this comment.
Why not move this into the match below and error if something unexpected happens? Ex. if you match on ProducerHead::BroadcastExec and input.downcast_ref::<BroadcastExec>() is None, then error?
There was a problem hiding this comment.
There's not really anything unexpected that can happen here. Actually, the only reason for this function to return a Result<> is because upstream RepartitionExec::try_new() signature requires it.
If the passed ProducerHead is a ProducerHead::BroadcastExec, but the input node is a RepartitionExec, this is fine and expected, it just means that we need to swap the RepartitionExec by a BroadcastExec.
This happens typically while re-ordering JOIN operations on the fly, maybe we swap what was a shuffle join by a broadcast join, and we need to swap the producer head at runtime.
Of course, that example is not possible in this PR, but it will start happening relatively soon.
| } | ||
|
|
||
| /// Defines what shape should the head node of a stage have upon getting executed. Depending | ||
| /// on the [NetworkBoundary] implementation, the stage below should have different head nodes. |
There was a problem hiding this comment.
mega nit: in upstream they add backticks like [NetworkBoundary]
There was a problem hiding this comment.
Yeah, I've been noticing for a while. I think this has to do in how things render in docs.rs. We should probably actually:
- Render this on docs.rs
- If we think it's indeed weird, do a PR fully dedicated to adding backticks in all docs at once
There was a problem hiding this comment.
Created this issue for tracking:
|
|
||
| /// Defines what shape should the head node of a stage have upon getting executed. Depending | ||
| /// on the [NetworkBoundary] implementation, the stage below should have different head nodes. | ||
| pub enum ProducerHead { |
There was a problem hiding this comment.
nit: maybe NetworkBoundaryProducer is a better name. I don't feel that strongly though.
There was a problem hiding this comment.
I like the fact that the "Head" keyword is in there, as it states that we are not referring to a full plan, and we are referring to just the head of it.
| properties: Arc::new(PlanProperties::new( | ||
| EquivalenceProperties::new(Arc::clone(&self.schema)), | ||
| Partitioning::UnknownPartitioning(self.bench.partitions), | ||
| Partitioning::RoundRobinBatch(self.bench.partitions), |
There was a problem hiding this comment.
Isn't the ProducerHead of a NetworkShuffleExec always a RepartitionExec with Hash partitioning? So the output partitioning of the NetworkShuffleExec is Hash as well.
There was a problem hiding this comment.
Today, that happens always yes, but there are legitimate cases for this to be a RoundRobin shuffle: whenever a JOIN order in a CollectLeft join is swapped:
- The left side goes to the right, moving from a broadcast to a round robin shuffle (as we don't have a set of expressions to hash)
- The right side goes to the left, and whatever it was, it gets swapped to being a broadcast
I needed to change this line here today just because otherwise, this benchmark will try to set a ProducerHead in the producing stage to RepartitionExec(UnknownPartitioning), which fails to build in DataFusion. Instead, a RoundRobin is passed so that we can build that RepartitionExec successfully, which does not really matter what it is here, as this is just a benchmark for the transport part.
This is one PR from the following stack of PRs: - #477 - #463 <- you are here - #464 - #478 - #479 - #486 - #432 This PR introduces a NetworkBoundaryBuilder argument to the network boundary injection logic, allowing more flexible and configurable strategies for determining which exchanges require network communication. This enables better optimization of data movement across distributed tasks.
This is one PR from the following stack of PRs: - #477 - #463 - #464 <- you are here - #478 - #479 - #486 - #432 This PR introduces a MaxGauge metric to provide better tracking of peak values in distributed metrics collection. This enables more accurate monitoring of resource utilization and helps identify bottlenecks in the execution pipeline.
8ab2da5 to
657a531
Compare
This is one PR from the following stack of PRs:
Introduces the
ProducerHeadtype:Which is passed over the network while remotely executing tasks in order to set the appropriate node at the head of a stage.
Today, this is a noop because the right head node in stages is ensured statically at planning time, but in follow up PRs, network boundaries can get swamped and reorganized arbitrarily.
One example that happens in AQE:
Stage 1is huge, and during AQE the JOINs are swappedStage 1is now on the probe side, so it needs to be rewritten to aNetworkShuffleExec, otherwise duplicate data will be returned:Passing a
ProducerHeadat execution time unlocks two things:BroadcastExecorRepartitionExecbased on the network boundary above, which might have changed because of AQE