Skip to content

[Drain] Support extensible element metadata propagation in ReduceFnRunner#38230

Open
stankiewicz wants to merge 3 commits intoapache:masterfrom
stankiewicz:drain_combiner
Open

[Drain] Support extensible element metadata propagation in ReduceFnRunner#38230
stankiewicz wants to merge 3 commits intoapache:masterfrom
stankiewicz:drain_combiner

Conversation

@stankiewicz
Copy link
Copy Markdown
Contributor

@stankiewicz stankiewicz commented Apr 17, 2026

Description

This PR refactors the metadata propagation logic in ReduceFnRunner to support extensible metadata.

Previously, metadata tracking (specifically causedByDrain) was not stored in state at all during execution in ReduceFnRunner, which caused metatada loss failures when firing timers or merging panes. This PR fixes that by introducing a unified state map for element metadata.

To make it easier to add future payloads (such as OpenTelemetry context maps or CDC insert/update markers) without modifying method signatures, this change groups targeted fields into a unified container and offloads combination rules to a clean aggregator class.

Key Changes

  • Runner Fix: Added stateful tracking using METADATA_TAG (persisting CombinedMetadata) in ReduceFnRunner, ensuring metadata is no longer lost during grouping.
  • Extensible Container: Created CombinedMetadata (guided by @AutoValue) that groups element metadata together.
  • Metadata Combiners: Encapsulated combination logic within CombinedMetadataCombiner.

Open question

  • is it breaking change?

Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

@stankiewicz
Copy link
Copy Markdown
Contributor Author

R: @kennknowles

@github-actions
Copy link
Copy Markdown
Contributor

Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment assign set of reviewers

@stankiewicz stankiewicz changed the title [Refactor] Support extensible element metadata propagation in ReduceFnRunner [Drain] Support extensible element metadata propagation in ReduceFnRunner Apr 17, 2026
@apache apache deleted a comment from gemini-code-assist Bot Apr 17, 2026
@kennknowles kennknowles self-requested a review April 20, 2026 14:08
@stankiewicz
Copy link
Copy Markdown
Contributor Author

@kennknowles this is generally finished, the task :runners:core-java:analyzeClassesDependencies is failing because of an undeclared dependency on beam.model:fn-execution.
I need to add the missing dependency in runners/core-java/build.gradle.

@stankiewicz
Copy link
Copy Markdown
Contributor Author

/gemini review

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a CombinedMetadata class and associated logic to support stateful propagation of pipeline metadata within ReduceFnRunner. The changes include new combiner and coder implementations, as well as updates to state handling and tests. Feedback identifies a performance regression due to per-element state access in processElement, a bug in onTrigger where metadata is lost when windows remain open, and an optimization opportunity for the CombinedMetadata coder to reduce serialization overhead.

Comment on lines +599 to +607
ValueState<CombinedMetadata> metadataState = directContext.state().access(METADATA_TAG);
CombinedMetadata currentMetadata = metadataState.read();
if (currentMetadata == null) {
currentMetadata = CombinedMetadata.createDefault();
}
CombinedMetadata inputMetadata = CombinedMetadata.create(value.causedByDrain());
CombinedMetadata newMetadata =
CombinedMetadataCombiner.of().addInput(currentMetadata, inputMetadata);
metadataState.write(newMetadata);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This block introduces a state read and write for every single element processed by ReduceFnRunner. This is a significant performance regression, especially for high-throughput pipelines where causedByDrain is typically NORMAL. Additionally, this logic is currently executed before checking if the window is closed (line 609), meaning state is updated even for elements that will be dropped.

Consider optimizing this to avoid state access in the common case and moving it after the isClosed check.

Suggested change
ValueState<CombinedMetadata> metadataState = directContext.state().access(METADATA_TAG);
CombinedMetadata currentMetadata = metadataState.read();
if (currentMetadata == null) {
currentMetadata = CombinedMetadata.createDefault();
}
CombinedMetadata inputMetadata = CombinedMetadata.create(value.causedByDrain());
CombinedMetadata newMetadata =
CombinedMetadataCombiner.of().addInput(currentMetadata, inputMetadata);
metadataState.write(newMetadata);
if (value.causedByDrain() == CausedByDrain.CAUSED_BY_DRAIN) {
ValueState<CombinedMetadata> metadataState = directContext.state().access(METADATA_TAG);
CombinedMetadata currentMetadata = metadataState.read();
if (currentMetadata == null || currentMetadata.causedByDrain() != CausedByDrain.CAUSED_BY_DRAIN) {
metadataState.write(CombinedMetadata.create(CausedByDrain.CAUSED_BY_DRAIN));
}
}

Comment on lines +1034 to +1044
ValueState<CombinedMetadata> metadataState = directContext.state().access(METADATA_TAG);
CombinedMetadata aggregatedMetadata = metadataState.read();
if (aggregatedMetadata == null) {
aggregatedMetadata = CombinedMetadata.createDefault();
}
CombinedMetadata fullyAggregatedMetadata =
CombinedMetadataCombiner.of().addInput(aggregatedMetadata, metadata);
final CausedByDrain aggregatedCausedByDrain = fullyAggregatedMetadata.causedByDrain();
if (isFinished) {
metadataState.clear();
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

In onTrigger, the metadata from the trigger/timer is combined with the persisted state, but the result is not written back to state if the window remains open (isFinished == false). This means that any metadata associated with the trigger (e.g., a drain signal from a timer) will be lost for subsequent panes of the same window. To ensure "stateful tracking" as intended, the aggregated metadata should be persisted if the window remains open.

Suggested change
ValueState<CombinedMetadata> metadataState = directContext.state().access(METADATA_TAG);
CombinedMetadata aggregatedMetadata = metadataState.read();
if (aggregatedMetadata == null) {
aggregatedMetadata = CombinedMetadata.createDefault();
}
CombinedMetadata fullyAggregatedMetadata =
CombinedMetadataCombiner.of().addInput(aggregatedMetadata, metadata);
final CausedByDrain aggregatedCausedByDrain = fullyAggregatedMetadata.causedByDrain();
if (isFinished) {
metadataState.clear();
}
ValueState<CombinedMetadata> metadataState = directContext.state().access(METADATA_TAG);
CombinedMetadata aggregatedMetadata = metadataState.read();
CombinedMetadata fullyAggregatedMetadata =
CombinedMetadataCombiner.of()
.addInput(
aggregatedMetadata == null ? CombinedMetadata.createDefault() : aggregatedMetadata,
metadata);
final CausedByDrain aggregatedCausedByDrain = fullyAggregatedMetadata.causedByDrain();
if (isFinished) {
metadataState.clear();
} else if (!fullyAggregatedMetadata.equals(aggregatedMetadata)) {
metadataState.write(fullyAggregatedMetadata);
}

Comment on lines +64 to +74
NullableCoder.of(ByteArrayCoder.of()).encode(null, outStream);
return;
}
BeamFnApi.Elements.ElementMetadata.Builder builder =
BeamFnApi.Elements.ElementMetadata.newBuilder();
builder.setDrain(
value.causedByDrain() == CausedByDrain.CAUSED_BY_DRAIN
? BeamFnApi.Elements.DrainMode.Enum.DRAINING
: BeamFnApi.Elements.DrainMode.Enum.NOT_DRAINING);

NullableCoder.of(ByteArrayCoder.of()).encode(builder.build().toByteArray(), outStream);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The Coder implementation uses NullableCoder.of(ByteArrayCoder.of()) to wrap the serialized proto bytes. This adds unnecessary overhead (extra bytes for nullability and length prefixing) since AtomicCoder can handle the serialization directly. Furthermore, ByteArrayCoder is redundant if you are already managing the byte array from the proto. Consider simplifying the coder to write the proto bytes directly to the OutputStream with a simple length prefix if necessary.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant