Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions runners/core-java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ dependencies {
implementation project(path: ":model:pipeline", configuration: "shadow")
implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation project(path: ":model:job-management", configuration: "shadow")
implementation project(path: ":model:fn-execution", configuration: "shadow")
implementation library.java.vendored_guava_32_1_2_jre
implementation library.java.joda_time
implementation library.java.vendored_grpc_1_69_0
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.core;

import com.google.auto.value.AutoValue;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.values.CausedByDrain;

/**
* Encapsulates metadata that propagates with elements in the pipeline.
*
* <p>This metadata is sent along with elements. It currently includes fields like {@link
* CausedByDrain}, and is designed to be extensible to support future metadata fields such as
* OpenTelemetry context or CDC (Change Data Capture) kind.
*
* <p>The purpose of this class is to group targeted metadata fields together. This makes it easier
* to define combination strategies (e.g., when accumulating state in {@code ReduceFnRunner}) when
* multiple elements are merged or grouped, without having to extend method signatures or state
* handling for every new metadata field.
*/
@AutoValue
public abstract class CombinedMetadata {
public abstract CausedByDrain causedByDrain();

public static CombinedMetadata create(CausedByDrain causedByDrain) {
return new AutoValue_CombinedMetadata(causedByDrain);
}

public static CombinedMetadata createDefault() {
return create(CausedByDrain.NORMAL);
}

public static class Coder extends AtomicCoder<CombinedMetadata> {
private static final Coder INSTANCE = new Coder();

public static Coder of() {
return INSTANCE;
}

@Override
public void encode(CombinedMetadata value, OutputStream outStream) throws IOException {
if (value == null) {
NullableCoder.of(ByteArrayCoder.of()).encode(null, outStream);
return;
}
BeamFnApi.Elements.ElementMetadata.Builder builder =
BeamFnApi.Elements.ElementMetadata.newBuilder();
builder.setDrain(
value.causedByDrain() == CausedByDrain.CAUSED_BY_DRAIN
? BeamFnApi.Elements.DrainMode.Enum.DRAINING
: BeamFnApi.Elements.DrainMode.Enum.NOT_DRAINING);

NullableCoder.of(ByteArrayCoder.of()).encode(builder.build().toByteArray(), outStream);
Comment on lines +64 to +74
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The Coder implementation uses NullableCoder.of(ByteArrayCoder.of()) to wrap the serialized proto bytes. This adds unnecessary overhead (extra bytes for nullability and length prefixing) since AtomicCoder can handle the serialization directly. Furthermore, ByteArrayCoder is redundant if you are already managing the byte array from the proto. Consider simplifying the coder to write the proto bytes directly to the OutputStream with a simple length prefix if necessary.

}

@Override
public CombinedMetadata decode(InputStream inStream) throws IOException {
byte[] bytes = NullableCoder.of(ByteArrayCoder.of()).decode(inStream);
if (bytes == null) {
return CombinedMetadata.createDefault();
}
BeamFnApi.Elements.ElementMetadata proto =
BeamFnApi.Elements.ElementMetadata.parseFrom(bytes);

CausedByDrain causedByDrain =
proto.getDrain() == BeamFnApi.Elements.DrainMode.Enum.DRAINING
? CausedByDrain.CAUSED_BY_DRAIN
: CausedByDrain.NORMAL;

return CombinedMetadata.create(causedByDrain);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.core;

import org.apache.beam.sdk.values.CausedByDrain;

/** Combiner for CombinedMetadata. */
class CombinedMetadataCombiner implements MetadataCombiner<CombinedMetadata> {
private static final CombinedMetadataCombiner INSTANCE = new CombinedMetadataCombiner();

public static CombinedMetadataCombiner of() {
return INSTANCE;
}

private final CausedByDrainCombiner causedByDrainCombiner = CausedByDrainCombiner.of();

@Override
public CombinedMetadata createAccumulator() {
return CombinedMetadata.create(causedByDrainCombiner.createAccumulator());
}

@Override
public CombinedMetadata addInput(CombinedMetadata accumulator, CombinedMetadata input) {
return CombinedMetadata.create(
causedByDrainCombiner.addInput(accumulator.causedByDrain(), input.causedByDrain()));
}

/** Combiner for CausedByDrain metadata. */
static class CausedByDrainCombiner implements MetadataCombiner<CausedByDrain> {
private static final CausedByDrainCombiner INSTANCE = new CausedByDrainCombiner();

public static CausedByDrainCombiner of() {
return INSTANCE;
}

@Override
public CausedByDrain createAccumulator() {
return CausedByDrain.NORMAL;
}

@Override
public CausedByDrain addInput(CausedByDrain current, CausedByDrain input) {
if (current == CausedByDrain.CAUSED_BY_DRAIN || input == CausedByDrain.CAUSED_BY_DRAIN) {
return CausedByDrain.CAUSED_BY_DRAIN;
}
return CausedByDrain.NORMAL;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.core;

/** Interface for combining pipeline metadata. */
interface MetadataCombiner<T> {
T createAccumulator();

T addInput(T accumulator, T input);
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
Expand Down Expand Up @@ -107,6 +108,10 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
* <li>It uses discarding or accumulation mode according to the {@link WindowingStrategy}.
* </ul>
*/
static final StateTag<ValueState<CombinedMetadata>> METADATA_TAG =
StateTags.makeSystemTagInternal(
StateTags.value("combinedMetadata", CombinedMetadata.Coder.of()));

private final WindowingStrategy<Object, W> windowingStrategy;

private final WindowedValueReceiver<KV<K, OutputT>> outputter;
Expand Down Expand Up @@ -376,7 +381,7 @@ public void processElements(Iterable<WindowedValue<InputT>> values) throws Excep
emit(
contextFactory.base(window, StateStyle.DIRECT),
contextFactory.base(window, StateStyle.RENAMED),
CausedByDrain.NORMAL);
CombinedMetadata.createDefault());
}

// We're all done with merging and emitting elements so can compress the activeWindow state.
Expand Down Expand Up @@ -590,6 +595,17 @@ private void processElement(Map<W, W> windowToMergeResult, WindowedValue<InputT>
value.getTimestamp(),
StateStyle.DIRECT,
value.causedByDrain());

ValueState<CombinedMetadata> metadataState = directContext.state().access(METADATA_TAG);
CombinedMetadata currentMetadata = metadataState.read();
if (currentMetadata == null) {
currentMetadata = CombinedMetadata.createDefault();
}
CombinedMetadata inputMetadata = CombinedMetadata.create(value.causedByDrain());
CombinedMetadata newMetadata =
CombinedMetadataCombiner.of().addInput(currentMetadata, inputMetadata);
metadataState.write(newMetadata);
Comment on lines +599 to +607
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This block introduces a state read and write for every single element processed by ReduceFnRunner. This is a significant performance regression, especially for high-throughput pipelines where causedByDrain is typically NORMAL. Additionally, this logic is currently executed before checking if the window is closed (line 609), meaning state is updated even for elements that will be dropped.

Consider optimizing this to avoid state access in the common case and moving it after the isClosed check.

Suggested change
ValueState<CombinedMetadata> metadataState = directContext.state().access(METADATA_TAG);
CombinedMetadata currentMetadata = metadataState.read();
if (currentMetadata == null) {
currentMetadata = CombinedMetadata.createDefault();
}
CombinedMetadata inputMetadata = CombinedMetadata.create(value.causedByDrain());
CombinedMetadata newMetadata =
CombinedMetadataCombiner.of().addInput(currentMetadata, inputMetadata);
metadataState.write(newMetadata);
if (value.causedByDrain() == CausedByDrain.CAUSED_BY_DRAIN) {
ValueState<CombinedMetadata> metadataState = directContext.state().access(METADATA_TAG);
CombinedMetadata currentMetadata = metadataState.read();
if (currentMetadata == null || currentMetadata.causedByDrain() != CausedByDrain.CAUSED_BY_DRAIN) {
metadataState.write(CombinedMetadata.create(CausedByDrain.CAUSED_BY_DRAIN));
}
}


if (triggerRunner.isClosed(directContext.state())) {
// This window has already been closed.
droppedDueToClosedWindow.inc();
Expand Down Expand Up @@ -649,15 +665,15 @@ private class WindowActivation {
// garbage collect the window. We'll consider any timer at or after the
// end-of-window time to be a signal to garbage collect.
public final boolean isGarbageCollection;
public final CausedByDrain causedByDrain;
public final CombinedMetadata combinedMetadata;

WindowActivation(
ReduceFn<K, InputT, OutputT, W>.Context directContext,
ReduceFn<K, InputT, OutputT, W>.Context renamedContext,
CausedByDrain causedByDrain) {
CombinedMetadata combinedMetadata) {
this.directContext = directContext;
this.renamedContext = renamedContext;
this.causedByDrain = causedByDrain;
this.combinedMetadata = combinedMetadata;
W window = directContext.window();

// The output watermark is before the end of the window if it is either unknown
Expand Down Expand Up @@ -742,7 +758,8 @@ public void onTimers(Iterable<TimerData> timers) throws Exception {
ReduceFn<K, InputT, OutputT, W>.Context renamedContext =
contextFactory.base(window, StateStyle.RENAMED);
WindowActivation windowActivation =
new WindowActivation(directContext, renamedContext, timer.causedByDrain());
new WindowActivation(
directContext, renamedContext, CombinedMetadata.create(timer.causedByDrain()));
windowActivations.put(window, windowActivation);

// Perform prefetching of state to determine if the trigger should fire.
Expand Down Expand Up @@ -778,7 +795,7 @@ public void onTimers(Iterable<TimerData> timers) throws Exception {
directContext.window(),
timerInternals.currentInputWatermarkTime(),
timerInternals.currentOutputWatermarkTime(),
windowActivation.causedByDrain);
windowActivation.combinedMetadata.causedByDrain());

boolean windowIsActiveAndOpen = windowActivation.windowIsActiveAndOpen();
if (windowIsActiveAndOpen) {
Expand All @@ -792,7 +809,7 @@ public void onTimers(Iterable<TimerData> timers) throws Exception {
renamedContext,
true /* isFinished */,
windowActivation.isEndOfWindow,
windowActivation.causedByDrain);
windowActivation.combinedMetadata);
checkState(newHold == null, "Hold placed at %s despite isFinished being true.", newHold);
}

Expand All @@ -810,7 +827,7 @@ public void onTimers(Iterable<TimerData> timers) throws Exception {
if (windowActivation.windowIsActiveAndOpen()
&& triggerRunner.shouldFire(
directContext.window(), directContext.timers(), directContext.state())) {
emit(directContext, renamedContext, windowActivation.causedByDrain);
emit(directContext, renamedContext, windowActivation.combinedMetadata);
}

if (windowActivation.isEndOfWindow) {
Expand Down Expand Up @@ -874,6 +891,7 @@ private void clearAllState(
triggerRunner.clearState(
directContext.window(), directContext.timers(), directContext.state());
paneInfoTracker.clear(directContext.state());
directContext.state().access(METADATA_TAG).clear();
} else {
// If !windowIsActiveAndOpen then !activeWindows.isActive (1) or triggerRunner.isClosed (2).
// For (1), if !activeWindows.isActive then the window must be merging and has been
Expand Down Expand Up @@ -934,8 +952,9 @@ private void prefetchEmit(
private void emit(
ReduceFn<K, InputT, OutputT, W>.Context directContext,
ReduceFn<K, InputT, OutputT, W>.Context renamedContext,
CausedByDrain causedByDrain)
CombinedMetadata metadata)
throws Exception {

checkState(
triggerRunner.shouldFire(
directContext.window(), directContext.timers(), directContext.state()));
Expand All @@ -950,13 +969,14 @@ private void emit(
// Run onTrigger to produce the actual pane contents.
// As a side effect it will clear all element holds, but not necessarily any
// end-of-window or garbage collection holds.
onTrigger(directContext, renamedContext, isFinished, false /*isEndOfWindow*/, causedByDrain);
onTrigger(directContext, renamedContext, isFinished, false /*isEndOfWindow*/, metadata);

// Now that we've triggered, the pane is empty.
nonEmptyPanes.clearPane(renamedContext.state());

// Cleanup buffered data if appropriate
if (shouldDiscard) {
directContext.state().access(METADATA_TAG).clear();
// Cleanup flavor C: The user does not want any buffered data to persist between panes.
reduceFn.clearState(renamedContext);
}
Expand Down Expand Up @@ -1009,8 +1029,19 @@ private void prefetchOnTrigger(
ReduceFn<K, InputT, OutputT, W>.Context renamedContext,
final boolean isFinished,
boolean isEndOfWindow,
CausedByDrain causedByDrain)
CombinedMetadata metadata)
throws Exception {
ValueState<CombinedMetadata> metadataState = directContext.state().access(METADATA_TAG);
CombinedMetadata aggregatedMetadata = metadataState.read();
if (aggregatedMetadata == null) {
aggregatedMetadata = CombinedMetadata.createDefault();
}
CombinedMetadata fullyAggregatedMetadata =
CombinedMetadataCombiner.of().addInput(aggregatedMetadata, metadata);
final CausedByDrain aggregatedCausedByDrain = fullyAggregatedMetadata.causedByDrain();
if (isFinished) {
metadataState.clear();
}
Comment on lines +1034 to +1044
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

In onTrigger, the metadata from the trigger/timer is combined with the persisted state, but the result is not written back to state if the window remains open (isFinished == false). This means that any metadata associated with the trigger (e.g., a drain signal from a timer) will be lost for subsequent panes of the same window. To ensure "stateful tracking" as intended, the aggregated metadata should be persisted if the window remains open.

Suggested change
ValueState<CombinedMetadata> metadataState = directContext.state().access(METADATA_TAG);
CombinedMetadata aggregatedMetadata = metadataState.read();
if (aggregatedMetadata == null) {
aggregatedMetadata = CombinedMetadata.createDefault();
}
CombinedMetadata fullyAggregatedMetadata =
CombinedMetadataCombiner.of().addInput(aggregatedMetadata, metadata);
final CausedByDrain aggregatedCausedByDrain = fullyAggregatedMetadata.causedByDrain();
if (isFinished) {
metadataState.clear();
}
ValueState<CombinedMetadata> metadataState = directContext.state().access(METADATA_TAG);
CombinedMetadata aggregatedMetadata = metadataState.read();
CombinedMetadata fullyAggregatedMetadata =
CombinedMetadataCombiner.of()
.addInput(
aggregatedMetadata == null ? CombinedMetadata.createDefault() : aggregatedMetadata,
metadata);
final CausedByDrain aggregatedCausedByDrain = fullyAggregatedMetadata.causedByDrain();
if (isFinished) {
metadataState.clear();
} else if (!fullyAggregatedMetadata.equals(aggregatedMetadata)) {
metadataState.write(fullyAggregatedMetadata);
}

// Extract the window hold, and as a side effect clear it.
final WatermarkHold.OldAndNewHolds pair =
watermarkHold.extractAndRelease(renamedContext, isFinished).read();
Expand Down Expand Up @@ -1081,12 +1112,12 @@ private void prefetchOnTrigger(
.setValue(KV.of(key, toOutput))
.setTimestamp(outputTimestamp)
.setWindows(windows)
.setCausedByDrain(causedByDrain)
.setCausedByDrain(aggregatedCausedByDrain)
.setPaneInfo(paneInfo)
.setReceiver(outputter)
.output();
},
causedByDrain);
aggregatedCausedByDrain);

reduceFn.onTrigger(renamedTriggerContext);
}
Expand Down
Loading
Loading