Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,11 @@ public Instant timestamp(DoFn<InputT, OutputT> doFn) {
return timestamp();
}

@Override
public CausedByDrain causedByDrain(DoFn<InputT, OutputT> doFn) {
return elem.causedByDrain();
}
Comment on lines +559 to +561
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

medium: This method should include a Javadoc comment explaining its purpose and behavior, similar to other methods in this class.

   /**
    * Provide a reference to the caused by drain.
    */
    @Override
    public CausedByDrain causedByDrain(DoFn<InputT, OutputT> doFn) {
      return elem.causedByDrain();
    }


@Override
public String timerId(DoFn<InputT, OutputT> doFn) {
throw new UnsupportedOperationException(
Expand Down Expand Up @@ -831,6 +836,11 @@ public Instant timestamp(DoFn<InputT, OutputT> doFn) {
return timestamp();
}

@Override
public CausedByDrain causedByDrain(DoFn<InputT, OutputT> doFn) {
return causedByDrain;
}
Comment on lines +840 to +842
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

medium: This method should include a Javadoc comment explaining its purpose and behavior, similar to other methods in this class.

   /**
    * Provide a reference to the caused by drain.
    */
    @Override
    public CausedByDrain causedByDrain(DoFn<InputT, OutputT> doFn) {
      return causedByDrain;
    }


@Override
public String timerId(DoFn<InputT, OutputT> doFn) {
return timerId;
Expand Down Expand Up @@ -1119,6 +1129,11 @@ public Instant timestamp(DoFn<InputT, OutputT> doFn) {
return timestamp;
}

@Override
public CausedByDrain causedByDrain(DoFn<InputT, OutputT> doFn) {
throw new UnsupportedOperationException("CausedByDrain parameters are not supported.");
}
Comment on lines +1133 to +1135
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

medium: This method should include a Javadoc comment explaining its purpose and behavior, similar to other methods in this class.

   /**
    * Provide a reference to the caused by drain.
    */
    @Override
    public CausedByDrain causedByDrain(DoFn<InputT, OutputT> doFn) {
      throw new UnsupportedOperationException("CausedByDrain parameters are not supported.");
    }


@Override
public String timerId(DoFn<InputT, OutputT> doFn) {
throw new UnsupportedOperationException("Timer parameters are not supported.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.OnTimerMethod;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.BundleFinalizerParameter;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.Cases;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.CausedByDrainParameter;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.ElementParameter;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.FinishBundleContextParameter;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.OnTimerContextParameter;
Expand Down Expand Up @@ -126,6 +127,7 @@ class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory {
public static final String ELEMENT_PARAMETER_METHOD = "element";
public static final String SCHEMA_ELEMENT_PARAMETER_METHOD = "schemaElement";
public static final String TIMESTAMP_PARAMETER_METHOD = "timestamp";
public static final String CAUSED_BY_DRAIN_PARAMETER_METHOD = "causedByDrain";
public static final String BUNDLE_FINALIZER_PARAMETER_METHOD = "bundleFinalizer";
public static final String OUTPUT_ROW_RECEIVER_METHOD = "outputRowReceiver";
public static final String TIME_DOMAIN_PARAMETER_METHOD = "timeDomain";
Expand Down Expand Up @@ -1100,6 +1102,15 @@ public StackManipulation dispatch(TimestampParameter p) {
TIMESTAMP_PARAMETER_METHOD, DoFn.class)));
}

@Override
public StackManipulation dispatch(CausedByDrainParameter p) {
return new StackManipulation.Compound(
pushDelegate,
MethodInvocation.invoke(
getExtraContextFactoryMethodDescription(
CAUSED_BY_DRAIN_PARAMETER_METHOD, DoFn.class)));
}

@Override
public StackManipulation dispatch(BundleFinalizerParameter p) {
return simpleExtraContextParameter(BUNDLE_FINALIZER_PARAMETER_METHOD);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.values.CausedByDrain;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.checkerframework.checker.nullness.qual.Nullable;
Expand Down Expand Up @@ -217,6 +218,9 @@ interface ArgumentProvider<InputT, OutputT> {
/** Provide a reference to the input element timestamp. */
Instant timestamp(DoFn<InputT, OutputT> doFn);

/** Provide a reference to the caused by drain. */
CausedByDrain causedByDrain(DoFn<InputT, OutputT> doFn);

/** Provide a reference to the time domain for a timer firing. */
TimeDomain timeDomain(DoFn<InputT, OutputT> doFn);

Expand Down Expand Up @@ -325,6 +329,12 @@ public Instant timestamp(DoFn<InputT, OutputT> doFn) {
String.format("Timestamp unsupported in %s", getErrorContext()));
}

@Override
public CausedByDrain causedByDrain(DoFn<InputT, OutputT> doFn) {
throw new UnsupportedOperationException(
String.format("CausedByDrain unsupported in %s", getErrorContext()));
Comment thread
stankiewicz marked this conversation as resolved.
}

@Override
public String timerId(DoFn<InputT, OutputT> doFn) {
throw new UnsupportedOperationException(
Expand Down Expand Up @@ -514,6 +524,11 @@ public Instant timestamp(DoFn<InputT, OutputT> doFn) {
return delegate.timestamp(doFn);
}

@Override
public CausedByDrain causedByDrain(DoFn<InputT, OutputT> doFn) {
return delegate.causedByDrain(doFn);
}
Comment thread
stankiewicz marked this conversation as resolved.

@Override
public TimeDomain timeDomain(DoFn<InputT, OutputT> doFn) {
return delegate.timeDomain(doFn);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,8 @@ public <ResultT> ResultT match(Cases<ResultT> cases) {
return cases.dispatch((TimerIdParameter) this);
} else if (this instanceof BundleFinalizerParameter) {
return cases.dispatch((BundleFinalizerParameter) this);
} else if (this instanceof CausedByDrainParameter) {
return cases.dispatch((CausedByDrainParameter) this);
} else if (this instanceof KeyParameter) {
return cases.dispatch((KeyParameter) this);
} else {
Expand Down Expand Up @@ -400,6 +402,8 @@ public interface Cases<ResultT> {

ResultT dispatch(BundleFinalizerParameter p);

ResultT dispatch(CausedByDrainParameter p);

ResultT dispatch(KeyParameter p);

/** A base class for a visitor with a default method for cases it is not interested in. */
Expand Down Expand Up @@ -497,6 +501,11 @@ public ResultT dispatch(BundleFinalizerParameter p) {
return dispatchDefault(p);
}

@Override
public ResultT dispatch(CausedByDrainParameter p) {
return dispatchDefault(p);
}

@Override
public ResultT dispatch(StateParameter p) {
return dispatchDefault(p);
Expand Down Expand Up @@ -552,6 +561,8 @@ public ResultT dispatch(KeyParameter p) {
new AutoValue_DoFnSignature_Parameter_PipelineOptionsParameter();
private static final BundleFinalizerParameter BUNDLE_FINALIZER_PARAMETER =
new AutoValue_DoFnSignature_Parameter_BundleFinalizerParameter();
private static final CausedByDrainParameter CAUSED_BY_DRAIN_PARAMETER =
new AutoValue_DoFnSignature_Parameter_CausedByDrainParameter();
private static final OnWindowExpirationContextParameter ON_WINDOW_EXPIRATION_CONTEXT_PARAMETER =
new AutoValue_DoFnSignature_Parameter_OnWindowExpirationContextParameter();

Expand All @@ -575,6 +586,11 @@ public static BundleFinalizerParameter bundleFinalizer() {
return BUNDLE_FINALIZER_PARAMETER;
}

/** Returns a {@link CausedByDrainParameter}. */
public static CausedByDrainParameter causedByDrainParameter() {
return CAUSED_BY_DRAIN_PARAMETER;
}

public static ElementParameter elementParameter(TypeDescriptor<?> elementT) {
return new AutoValue_DoFnSignature_Parameter_ElementParameter(elementT);
}
Expand Down Expand Up @@ -727,6 +743,16 @@ public abstract static class BundleFinalizerParameter extends Parameter {
BundleFinalizerParameter() {}
}

/**
* Descriptor for a {@link Parameter} of type {@link org.apache.beam.sdk.values.CausedByDrain}.
*
* <p>All such descriptors are equal.
*/
@AutoValue
public abstract static class CausedByDrainParameter extends Parameter {
CausedByDrainParameter() {}
}

/**
* Descriptor for a {@link Parameter} of type {@link DoFn.Element}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.common.ReflectHelpers;
import org.apache.beam.sdk.values.CausedByDrain;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
Expand Down Expand Up @@ -139,6 +140,7 @@ private DoFnSignatures() {}
Parameter.StateParameter.class,
Parameter.SideInputParameter.class,
Parameter.TimerFamilyParameter.class,
Parameter.CausedByDrainParameter.class,
Parameter.BundleFinalizerParameter.class);

private static final ImmutableList<Class<? extends Parameter>>
Expand All @@ -155,6 +157,7 @@ private DoFnSignatures() {}
Parameter.RestrictionTrackerParameter.class,
Parameter.WatermarkEstimatorParameter.class,
Parameter.SideInputParameter.class,
Parameter.CausedByDrainParameter.class,
Parameter.BundleFinalizerParameter.class);

private static final ImmutableList<Class<? extends Parameter>> ALLOWED_SETUP_PARAMETERS =
Expand Down Expand Up @@ -185,6 +188,7 @@ private DoFnSignatures() {}
Parameter.StateParameter.class,
Parameter.TimerFamilyParameter.class,
Parameter.TimerIdParameter.class,
Parameter.CausedByDrainParameter.class,
Parameter.KeyParameter.class);

private static final ImmutableList<Class<? extends Parameter>>
Expand All @@ -201,6 +205,7 @@ private DoFnSignatures() {}
Parameter.StateParameter.class,
Parameter.TimerFamilyParameter.class,
Parameter.TimerIdParameter.class,
Parameter.CausedByDrainParameter.class,
Parameter.KeyParameter.class);

private static final Collection<Class<? extends Parameter>>
Expand Down Expand Up @@ -1357,6 +1362,11 @@ private static Parameter analyzeExtraParameter(
return Parameter.keyT(paramT);
} else if (rawType.equals(TimeDomain.class)) {
return Parameter.timeDomainParameter();
} else if (CausedByDrain.class.isAssignableFrom(rawType)) {
methodErrors.checkArgument(
rawType.equals(CausedByDrain.class),
"CausedByDrain argument must have type org.apache.beam.sdk.values.CausedByDrain.");
return Parameter.causedByDrainParameter();
Comment thread
stankiewicz marked this conversation as resolved.
} else if (hasAnnotation(DoFn.SideInput.class, param.getAnnotations())) {
String sideInputId = getSideInputId(param.getAnnotations());
paramErrors.checkArgument(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,11 @@ public Instant timestamp(DoFn<InputT, OutputT> doFn) {
return outerContext.timestamp();
}

@Override
public CausedByDrain causedByDrain(DoFn<InputT, OutputT> doFn) {
return outerContext.causedByDrain();
}
Comment thread
stankiewicz marked this conversation as resolved.

@Override
public String timerId(DoFn<InputT, OutputT> doFn) {
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.BundleFinalizerParameter;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.CausedByDrainParameter;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.ElementParameter;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.FinishBundleContextParameter;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.OutputReceiverParameter;
Expand All @@ -78,6 +79,7 @@
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.values.CausedByDrain;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptor;
Expand Down Expand Up @@ -130,10 +132,11 @@ public void process(
PipelineOptions options,
@SideInput("tag1") String input1,
@SideInput("tag2") Integer input2,
BundleFinalizer bundleFinalizer) {}
BundleFinalizer bundleFinalizer,
CausedByDrain causedByDrain) {}
}.getClass());

assertThat(sig.processElement().extraParameters().size(), equalTo(9));
assertThat(sig.processElement().extraParameters().size(), equalTo(10));
assertThat(sig.processElement().extraParameters().get(0), instanceOf(ElementParameter.class));
assertThat(sig.processElement().extraParameters().get(1), instanceOf(TimestampParameter.class));
assertThat(sig.processElement().extraParameters().get(2), instanceOf(WindowParameter.class));
Expand All @@ -146,6 +149,8 @@ public void process(
assertThat(sig.processElement().extraParameters().get(7), instanceOf(SideInputParameter.class));
assertThat(
sig.processElement().extraParameters().get(8), instanceOf(BundleFinalizerParameter.class));
assertThat(
sig.processElement().extraParameters().get(9), instanceOf(CausedByDrainParameter.class));
}

@Test
Expand Down Expand Up @@ -585,6 +590,31 @@ public void onTimer(BoundedWindow w) {}
instanceOf(WindowParameter.class));
}

@Test
public void testCausedByDrainOnTimer() throws Exception {
final String timerId = "some-timer-id";
final String timerDeclarationId = TimerDeclaration.PREFIX + timerId;

DoFnSignature sig =
DoFnSignatures.getSignature(
new DoFn<String, String>() {

@TimerId(timerId)
private final TimerSpec myfield1 = TimerSpecs.timer(TimeDomain.EVENT_TIME);

@ProcessElement
public void process(ProcessContext c) {}

@OnTimer(timerId)
public void onTimer(CausedByDrain causedByDrain) {}
}.getClass());

assertThat(sig.onTimerMethods().get(timerDeclarationId).extraParameters().size(), equalTo(1));
assertThat(
sig.onTimerMethods().get(timerDeclarationId).extraParameters().get(0),
instanceOf(CausedByDrainParameter.class));
}

@Test
public void testAllParamsOnTimer() throws Exception {
final String timerId = "some-timer-id";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1804,6 +1804,11 @@ public <T> void outputWindowedValue(
outputTo(consumer, WindowedValues.of(output, timestamp, windows, paneInfo));
}

@Override
public CausedByDrain causedByDrain(DoFn<InputT, OutputT> doFn) {
return currentElement.causedByDrain();
}
Comment thread
stankiewicz marked this conversation as resolved.

@Override
public State state(String stateId, boolean alwaysFetched) {
StateDeclaration stateDeclaration = doFnSignature.stateDeclarations().get(stateId);
Expand Down Expand Up @@ -1946,6 +1951,11 @@ public <T> void outputWindowedValue(
public CausedByDrain causedByDrain() {
return currentElement.causedByDrain();
}

@Override
public CausedByDrain causedByDrain(DoFn<InputT, OutputT> doFn) {
return currentElement.causedByDrain();
Comment thread
stankiewicz marked this conversation as resolved.
}
}

/** Provides base arguments for a {@link DoFnInvoker} for a non-window observing method. */
Expand Down
Loading