Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
b23848b
Adds a new coder translator for Java SchemaCoder. Adds PipelineOption…
acrites Feb 17, 2026
2a637d9
Fixes whitespace in CHANGES file
acrites Feb 18, 2026
c3fc500
Addresses review comments. Renames v1/v2 to non-portable/portable, re…
acrites Feb 20, 2026
0e5aa28
Fixed formatting.
acrites Feb 23, 2026
fd08df2
Merge branch 'master' into new-schema-coder
acrites Feb 23, 2026
98ceb21
Updates formatting for CHANGES.md.
acrites Feb 23, 2026
1482cbd
Merge with recent changes to upstream.
acrites Feb 23, 2026
e0437a8
Cleans up CHANGES.md and changes new coder to take effect in 2.73 rel…
acrites Feb 24, 2026
a65e7c0
Refactors ModelCoderRegistrars so decisions about known coders are ma…
acrites Mar 24, 2026
3a396d2
Updates CHANGES.md
acrites Mar 24, 2026
67370da
Ran spotlessApply and fixed autoboxing warning.
acrites Mar 24, 2026
46f8236
Changes cleanup thread to forceExecute. Otherwise, we can deadlock if…
acrites Apr 4, 2026
19b704f
Changes cacheCommitFinalizers loop to only acquire the lock once and …
acrites Apr 6, 2026
f823ffa
Changes bundle finalizers to run in a dedicated ScheduledExecutorServ…
acrites Apr 7, 2026
19e3225
Changes thread pool to a single thread and moves scheduling cleanup u…
acrites Apr 7, 2026
a47758a
Merge branch 'apache:master' into master
acrites Apr 8, 2026
ba3533e
Uses other applied_finalize_ids after merging in https://github.com/a…
acrites Apr 8, 2026
c3af2e6
Merge branch 'master' into new-schema-coder
acrites Apr 13, 2026
e3fb918
Merge branch 'apache:master' into master
acrites Apr 13, 2026
a838d05
Changes update compatibility to 2.73 and below since this feature won…
acrites Apr 13, 2026
e2b7678
Merges master into branch.
acrites Apr 13, 2026
eace53c
Merges remote branch into local branch.
acrites Apr 13, 2026
d787de3
Merge branch 'master' of https://github.com/acrites/beam into new-sch…
acrites Apr 30, 2026
da82331
Adds missing # character to issue numbers in CHANGES.md.
acrites Apr 30, 2026
b827494
Unboxes Integer and Long to avoid AutoValueBoxedValues warning
acrites May 1, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@

## Breaking Changes

* X behavior was changed ([#X](https://github.com/apache/beam/issues/X)).
* Portable Java SDK now encodes SchemaCoders in a portable way ([#34672](https://github.com/apache/beam/issues/34672)).
- Original custom Java coder encoding can still be obtained using [StreamingOptions.setUpdateCompatibilityVersion("2.73")](https://github.com/apache/beam/blob/2cf0930e7ae1aa389c26ce6639b584877a3e31d9/sdks/java/core/src/main/java/org/apache/beam/sdk/options/StreamingOptions.java#L47) ([#34672](https://github.com/apache/beam/issues/34672)).
- Fixes ([#36496](https://github.com/apache/beam/issues/36496)), ([#30276](https://github.com/apache/beam/issues/30276)), ([#29245](https://github.com/apache/beam/issues/29245)).

## Deprecations

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ public Boolean visit(OrFinallyTrigger trigger) {
private static byte[] serializeWindowingStrategy(
WindowingStrategy<?, ?> windowingStrategy, PipelineOptions options) {
try {
SdkComponents sdkComponents = SdkComponents.create();
SdkComponents sdkComponents = SdkComponents.create(options);

String workerHarnessContainerImageURL =
DataflowRunner.getContainerImageForJob(options.as(DataflowPipelineOptions.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1331,13 +1331,13 @@ public DataflowPipelineJob run(Pipeline pipeline) {
// with the SDK harness image (which implements Fn API).
//
// The same Environment is used in different and contradictory ways, depending on whether
// it is a v1 or v2 job submission.
// it is a portable or non-portable job submission.
RunnerApi.Environment defaultEnvironmentForDataflow =
Environments.createDockerEnvironment(workerHarnessContainerImageURL);

// The SdkComponents for portable an non-portable job submission must be kept distinct. Both
// The SdkComponents for portable and non-portable job submission must be kept distinct. Both
// need the default environment.
SdkComponents portableComponents = SdkComponents.create();
SdkComponents portableComponents = SdkComponents.create(options);
portableComponents.registerEnvironment(
defaultEnvironmentForDataflow
.toBuilder()
Expand Down Expand Up @@ -1369,28 +1369,29 @@ public DataflowPipelineJob run(Pipeline pipeline) {
dataflowOptions.setPipelineUrl(stagedPipeline.getLocation());

if (useUnifiedWorker(options)) {
LOG.info("Skipping v1 transform replacements since job will run on v2.");
LOG.info(
"Skipping non-portable transform replacements since job will run on portable worker.");
} else {
// Now rewrite things to be as needed for v1 (mutates the pipeline)
// This way the job submitted is valid for v1 and v2, simultaneously
// Now rewrite things to be as needed for non-portable (mutates the pipeline).
// This way the job submitted is valid for portable and non-portable, simultaneously.
replaceV1Transforms(pipeline);
}
// Capture the SdkComponents for look up during step translations
SdkComponents dataflowV1Components = SdkComponents.create();
dataflowV1Components.registerEnvironment(
// Capture the SdkComponents for look up during step translations.
SdkComponents dataflowNonPortableComponents = SdkComponents.create(options);
dataflowNonPortableComponents.registerEnvironment(
defaultEnvironmentForDataflow
.toBuilder()
.addAllDependencies(getDefaultArtifacts())
.addAllCapabilities(Environments.getJavaCapabilities())
.build());
// No need to perform transform upgrading for the Runner v1 proto.
RunnerApi.Pipeline dataflowV1PipelineProto =
PipelineTranslation.toProto(pipeline, dataflowV1Components, true, false);
// No need to perform transform upgrading for the non-portable runner proto.
RunnerApi.Pipeline dataflowNonPortablePipelineProto =
PipelineTranslation.toProto(pipeline, dataflowNonPortableComponents, true, false);

if (LOG.isDebugEnabled()) {
LOG.debug(
"Dataflow v1 pipeline proto:\n{}",
TextFormat.printer().printToString(dataflowV1PipelineProto));
"Dataflow non-portable worker pipeline proto:\n{}",
TextFormat.printer().printToString(dataflowNonPortablePipelineProto));
}

// Set a unique client_request_id in the CreateJob request.
Expand All @@ -1410,7 +1411,11 @@ public DataflowPipelineJob run(Pipeline pipeline) {

JobSpecification jobSpecification =
translator.translate(
pipeline, dataflowV1PipelineProto, dataflowV1Components, this, packages);
pipeline,
dataflowNonPortablePipelineProto,
dataflowNonPortableComponents,
this,
packages);

if (!isNullOrEmpty(dataflowOptions.getDataflowWorkerJar()) && !useUnifiedWorker(options)) {
List<String> experiments =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import com.google.api.services.dataflow.model.Job;
import com.google.api.services.dataflow.model.Step;
import com.google.api.services.dataflow.model.WorkerPool;
import com.google.auto.value.AutoValue;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
Expand Down Expand Up @@ -92,6 +93,8 @@
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.ValueState;
Expand Down Expand Up @@ -166,7 +169,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
@Rule public transient ExpectedException thrown = ExpectedException.none();

private SdkComponents createSdkComponents(PipelineOptions options) {
SdkComponents sdkComponents = SdkComponents.create();
SdkComponents sdkComponents = SdkComponents.create(options);

String containerImageURL =
DataflowRunner.getContainerImageForJob(options.as(DataflowPipelineOptions.class));
Expand Down Expand Up @@ -1263,7 +1266,7 @@ public String apply(byte[] input) {
file1.deleteOnExit();
File file2 = File.createTempFile("file2-", ".txt");
file2.deleteOnExit();
SdkComponents sdkComponents = SdkComponents.create();
SdkComponents sdkComponents = SdkComponents.create(options);
sdkComponents.registerEnvironment(
Environments.createDockerEnvironment(DataflowRunner.getContainerImageForJob(options))
.toBuilder()
Expand Down Expand Up @@ -1839,4 +1842,53 @@ public OffsetRange getInitialRange(@SuppressWarnings("unused") @Element String e
return null;
}
}

@AutoValue
@DefaultSchema(AutoValueSchema.class)
public abstract static class SimpleAutoValue {
public abstract String getString();

public abstract int getInt32();

public abstract long getInt64();

public static DataflowPipelineTranslatorTest.SimpleAutoValue of(
String string, int int32, long int64) {
return new AutoValue_DataflowPipelineTranslatorTest_SimpleAutoValue(string, int32, int64);
}
}

@Test
public void testSchemaCoderTranslation() throws Exception {
DataflowPipelineOptions options = buildPipelineOptions();
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply(Impulse.create())
.apply(
MapElements.via(
new SimpleFunction<byte[], SimpleAutoValue>() {
@Override
public SimpleAutoValue apply(byte[] input) {
return SimpleAutoValue.of("foo", 5, 10L);
}
}))
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))));
{
SdkComponents sdkComponents = createSdkComponents(options);
RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline, sdkComponents, true);
Map<String, RunnerApi.Coder> coders = pipelineProto.getComponents().getCodersMap();
assertTrue(coders.containsKey("SchemaCoder"));
assertEquals("beam:coder:schema:v1", coders.get("SchemaCoder").getSpec().getUrn());
}

// Prior to version 2.74, SchemaCoders are translated as custom java coders.
{
options.as(StreamingOptions.class).setUpdateCompatibilityVersion("2.73");
SdkComponents sdkComponents = createSdkComponents(options);
RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline, sdkComponents, true);
Map<String, RunnerApi.Coder> coders = pipelineProto.getComponents().getCodersMap();
assertTrue(coders.containsKey("SchemaCoder"));
assertEquals("beam:coders:javasdk:0.1", coders.get("SchemaCoder").getSpec().getUrn());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ public void testLengthPrefixingOfKeyCoderInStatefulExecutableStage() throws Exce
// Add another stateful stage with a non-standard key coder
Pipeline p = Pipeline.create();
Coder<Void> keycoder = VoidCoder.of();
assertThat(ModelCoderRegistrar.isKnownCoder(keycoder), is(false));
ModelCoderRegistrar coderRegistrar = new ModelCoderRegistrar();
assertThat(coderRegistrar.isKnownCoder(keycoder, p.getOptions()), is(false));
p.apply("impulse", Impulse.create())
.apply(
"create",
Expand Down Expand Up @@ -165,7 +166,8 @@ public void onTimer() {}
public void testLengthPrefixingOfInputCoderExecutableStage() throws Exception {
Pipeline p = Pipeline.create();
Coder<Void> voidCoder = VoidCoder.of();
assertThat(ModelCoderRegistrar.isKnownCoder(voidCoder), is(false));
ModelCoderRegistrar coderRegistrar = new ModelCoderRegistrar();
assertThat(coderRegistrar.isKnownCoder(voidCoder, p.getOptions()), is(false));
p.apply("impulse", Impulse.create())
.apply(
ParDo.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.BiMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableBiMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.checkerframework.dataflow.qual.Deterministic;
Expand Down Expand Up @@ -62,6 +64,8 @@ private static class DefaultTranslationContext implements TranslationContext {}

private static @MonotonicNonNull BiMap<Class<? extends Coder>, String> knownCoderUrns;

private static @MonotonicNonNull List<CoderTranslatorRegistrar> coderTranslatorRegistrars;

private static @MonotonicNonNull Map<Class<? extends Coder>, CoderTranslator<? extends Coder>>
knownTranslators;

Expand All @@ -80,6 +84,53 @@ static BiMap<Class<? extends Coder>, String> getKnownCoderUrns() {
return knownCoderUrns;
}

private static void initializeCoderTranslatorRegistrars() {
ImmutableList.Builder<CoderTranslatorRegistrar> registrars = ImmutableList.builder();
for (CoderTranslatorRegistrar coderTranslatorRegistrar :
ServiceLoader.load(CoderTranslatorRegistrar.class)) {
registrars.add(coderTranslatorRegistrar);
}
coderTranslatorRegistrars = registrars.build();
}

static boolean isKnownCoder(Coder<?> coder, PipelineOptions options) {
if (coderTranslatorRegistrars == null) {
initializeCoderTranslatorRegistrars();
}
for (CoderTranslatorRegistrar registrar : coderTranslatorRegistrars) {
if (registrar.isKnownCoder(coder, options)) {
return true;
}
}
return false;
}

static CoderTranslator<? extends Coder> getCoderTranslator(Class<? extends Coder> coderClass) {
if (coderTranslatorRegistrars == null) {
initializeCoderTranslatorRegistrars();
}
for (CoderTranslatorRegistrar registrar : coderTranslatorRegistrars) {
CoderTranslator translator = registrar.getCoderTranslator(coderClass);
if (translator != null) {
return translator;
}
}
return null;
}

static Class<? extends Coder> getCoderForUrn(String coderUrn) {
if (coderTranslatorRegistrars == null) {
initializeCoderTranslatorRegistrars();
}
for (CoderTranslatorRegistrar registrar : coderTranslatorRegistrars) {
Class<? extends Coder> coder = registrar.getCoderForUrn(coderUrn);
if (coder != null) {
return coder;
}
}
return null;
}

@VisibleForTesting
@Deterministic
static Map<Class<? extends Coder>, CoderTranslator<? extends Coder>> getKnownTranslators() {
Expand Down Expand Up @@ -107,7 +158,7 @@ public static RunnerApi.MessageWithComponents toProto(Coder<?> coder) throws IOE

public static RunnerApi.Coder toProto(Coder<?> coder, SdkComponents components)
throws IOException {
if (getKnownCoderUrns().containsKey(coder.getClass())) {
if (isKnownCoder(coder, components.getPipelineOptions())) {
return toKnownCoder(coder, components);
}

Expand All @@ -129,7 +180,10 @@ private static RunnerApi.Coder toUnknownCoderWrapper(UnknownCoderWrapper coder)

private static RunnerApi.Coder toKnownCoder(Coder<?> coder, SdkComponents components)
throws IOException {
CoderTranslator translator = getKnownTranslators().get(coder.getClass());
CoderTranslator translator = getCoderTranslator(coder.getClass());
if (translator == null) {
throw new IOException("Unable to find CoderTranslator for known Coder");
}
List<String> componentIds = registerComponents(coder, translator, components);
return RunnerApi.Coder.newBuilder()
.addAllComponentCoderIds(componentIds)
Expand Down Expand Up @@ -186,8 +240,8 @@ private static Coder<?> fromKnownCoder(
components.getComponents().getCodersOrThrow(componentId), components, context);
coderComponents.add(innerCoder);
}
Class<? extends Coder> coderType = getKnownCoderUrns().inverse().get(coderUrn);
CoderTranslator<?> translator = getKnownTranslators().get(coderType);
Class<? extends Coder> coderType = getCoderForUrn(coderUrn);
CoderTranslator<?> translator = getCoderTranslator(coderType);
if (translator != null) {
return translator.fromComponents(
coderComponents, coder.getSpec().getPayload().toByteArray(), context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
* additional payload, which is not currently supported. This exists as a temporary measure.
*/
public interface CoderTranslator<T extends Coder<?>> {

/** Extract all component {@link Coder coders} within a coder. */
List<? extends Coder<?>> getComponents(T from);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import java.util.Map;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.checkerframework.checker.nullness.qual.Nullable;

/** A registrar of {@link Coder} URNs to the associated {@link CoderTranslator}. */
@SuppressWarnings({
Expand All @@ -34,4 +36,18 @@ public interface CoderTranslatorRegistrar {

/** Returns a mapping of URN to {@link CoderTranslator}. */
Map<Class<? extends Coder>, CoderTranslator<? extends Coder>> getCoderTranslators();

/**
* Returns whether the given Coder is known to this CoderTranslatorRegistrar. If the Coder is
* known, then getCoderTranslator() will return a non-null CoderTranslator.
*/
boolean isKnownCoder(Coder<?> coder, PipelineOptions options);

/** Returns the CoderTranslator to use for this Coder, or null if the Coder is not known. */
@Nullable
CoderTranslator<? extends Coder> getCoderTranslator(Class<? extends Coder> coderClass);

/** Returns the Coder to use for the given Urn, or null if the Urn is for an unknown Coder. */
@Nullable
Class<? extends Coder> getCoderForUrn(String coderUrn);
}
Loading
Loading