Skip to content

Commit c581911

Browse files
committed
tests, remove default implementations.
1 parent 2542f18 commit c581911

6 files changed

Lines changed: 67 additions & 6 deletions

File tree

runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -555,6 +555,11 @@ public Instant timestamp(DoFn<InputT, OutputT> doFn) {
555555
return timestamp();
556556
}
557557

558+
@Override
559+
public CausedByDrain causedByDrain(DoFn<InputT, OutputT> doFn) {
560+
return elem.causedByDrain();
561+
}
562+
558563
@Override
559564
public String timerId(DoFn<InputT, OutputT> doFn) {
560565
throw new UnsupportedOperationException(
@@ -831,6 +836,11 @@ public Instant timestamp(DoFn<InputT, OutputT> doFn) {
831836
return timestamp();
832837
}
833838

839+
@Override
840+
public CausedByDrain causedByDrain(DoFn<InputT, OutputT> doFn) {
841+
return causedByDrain;
842+
}
843+
834844
@Override
835845
public String timerId(DoFn<InputT, OutputT> doFn) {
836846
return timerId;
@@ -1119,6 +1129,11 @@ public Instant timestamp(DoFn<InputT, OutputT> doFn) {
11191129
return timestamp;
11201130
}
11211131

1132+
@Override
1133+
public CausedByDrain causedByDrain(DoFn<InputT, OutputT> doFn) {
1134+
throw new UnsupportedOperationException("CausedByDrain parameters are not supported.");
1135+
}
1136+
11221137
@Override
11231138
public String timerId(DoFn<InputT, OutputT> doFn) {
11241139
throw new UnsupportedOperationException("Timer parameters are not supported.");

sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -219,9 +219,7 @@ interface ArgumentProvider<InputT, OutputT> {
219219
Instant timestamp(DoFn<InputT, OutputT> doFn);
220220

221221
/** Provide a reference to the caused by drain. */
222-
default CausedByDrain causedByDrain(DoFn<InputT, OutputT> doFn) {
223-
return CausedByDrain.NORMAL;
224-
}
222+
CausedByDrain causedByDrain(DoFn<InputT, OutputT> doFn);
225223

226224
/** Provide a reference to the time domain for a timer firing. */
227225
TimeDomain timeDomain(DoFn<InputT, OutputT> doFn);

sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1362,7 +1362,10 @@ private static Parameter analyzeExtraParameter(
13621362
return Parameter.keyT(paramT);
13631363
} else if (rawType.equals(TimeDomain.class)) {
13641364
return Parameter.timeDomainParameter();
1365-
} else if (rawType.equals(CausedByDrain.class)) {
1365+
} else if (CausedByDrain.class.isAssignableFrom(rawType)) {
1366+
methodErrors.checkArgument(
1367+
rawType.equals(CausedByDrain.class),
1368+
"CausedByDrain argument must have type org.apache.beam.sdk.values.CausedByDrain.");
13661369
return Parameter.causedByDrainParameter();
13671370
} else if (hasAnnotation(DoFn.SideInput.class, param.getAnnotations())) {
13681371
String sideInputId = getSideInputId(param.getAnnotations());

sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SplittableParDoNaiveBounded.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -543,6 +543,11 @@ public Instant timestamp(DoFn<InputT, OutputT> doFn) {
543543
return outerContext.timestamp();
544544
}
545545

546+
@Override
547+
public CausedByDrain causedByDrain(DoFn<InputT, OutputT> doFn) {
548+
return outerContext.causedByDrain();
549+
}
550+
546551
@Override
547552
public String timerId(DoFn<InputT, OutputT> doFn) {
548553
throw new UnsupportedOperationException();

sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import org.apache.beam.sdk.transforms.Sum;
5757
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter;
5858
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.BundleFinalizerParameter;
59+
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.CausedByDrainParameter;
5960
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.ElementParameter;
6061
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.FinishBundleContextParameter;
6162
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.OutputReceiverParameter;
@@ -78,6 +79,7 @@
7879
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
7980
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
8081
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
82+
import org.apache.beam.sdk.values.CausedByDrain;
8183
import org.apache.beam.sdk.values.KV;
8284
import org.apache.beam.sdk.values.Row;
8385
import org.apache.beam.sdk.values.TypeDescriptor;
@@ -130,10 +132,11 @@ public void process(
130132
PipelineOptions options,
131133
@SideInput("tag1") String input1,
132134
@SideInput("tag2") Integer input2,
133-
BundleFinalizer bundleFinalizer) {}
135+
BundleFinalizer bundleFinalizer,
136+
CausedByDrain causedByDrain) {}
134137
}.getClass());
135138

136-
assertThat(sig.processElement().extraParameters().size(), equalTo(9));
139+
assertThat(sig.processElement().extraParameters().size(), equalTo(10));
137140
assertThat(sig.processElement().extraParameters().get(0), instanceOf(ElementParameter.class));
138141
assertThat(sig.processElement().extraParameters().get(1), instanceOf(TimestampParameter.class));
139142
assertThat(sig.processElement().extraParameters().get(2), instanceOf(WindowParameter.class));
@@ -146,6 +149,8 @@ public void process(
146149
assertThat(sig.processElement().extraParameters().get(7), instanceOf(SideInputParameter.class));
147150
assertThat(
148151
sig.processElement().extraParameters().get(8), instanceOf(BundleFinalizerParameter.class));
152+
assertThat(
153+
sig.processElement().extraParameters().get(9), instanceOf(CausedByDrainParameter.class));
149154
}
150155

151156
@Test
@@ -585,6 +590,31 @@ public void onTimer(BoundedWindow w) {}
585590
instanceOf(WindowParameter.class));
586591
}
587592

593+
@Test
594+
public void testCausedByDrainOnTimer() throws Exception {
595+
final String timerId = "some-timer-id";
596+
final String timerDeclarationId = TimerDeclaration.PREFIX + timerId;
597+
598+
DoFnSignature sig =
599+
DoFnSignatures.getSignature(
600+
new DoFn<String, String>() {
601+
602+
@TimerId(timerId)
603+
private final TimerSpec myfield1 = TimerSpecs.timer(TimeDomain.EVENT_TIME);
604+
605+
@ProcessElement
606+
public void process(ProcessContext c) {}
607+
608+
@OnTimer(timerId)
609+
public void onTimer(CausedByDrain causedByDrain) {}
610+
}.getClass());
611+
612+
assertThat(sig.onTimerMethods().get(timerDeclarationId).extraParameters().size(), equalTo(1));
613+
assertThat(
614+
sig.onTimerMethods().get(timerDeclarationId).extraParameters().get(0),
615+
instanceOf(CausedByDrainParameter.class));
616+
}
617+
588618
@Test
589619
public void testAllParamsOnTimer() throws Exception {
590620
final String timerId = "some-timer-id";

sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1804,6 +1804,11 @@ public <T> void outputWindowedValue(
18041804
outputTo(consumer, WindowedValues.of(output, timestamp, windows, paneInfo));
18051805
}
18061806

1807+
@Override
1808+
public CausedByDrain causedByDrain(DoFn<InputT, OutputT> doFn) {
1809+
return currentElement.causedByDrain();
1810+
}
1811+
18071812
@Override
18081813
public State state(String stateId, boolean alwaysFetched) {
18091814
StateDeclaration stateDeclaration = doFnSignature.stateDeclarations().get(stateId);
@@ -1946,6 +1951,11 @@ public <T> void outputWindowedValue(
19461951
public CausedByDrain causedByDrain() {
19471952
return currentElement.causedByDrain();
19481953
}
1954+
1955+
@Override
1956+
public CausedByDrain causedByDrain(DoFn<InputT, OutputT> doFn) {
1957+
return currentElement.causedByDrain();
1958+
}
19491959
}
19501960

19511961
/** Provides base arguments for a {@link DoFnInvoker} for a non-window observing method. */

0 commit comments

Comments
 (0)