Skip to content

[WIP] Add Spark 4 runner#38255

Open
tkaymak wants to merge 26 commits intoapache:masterfrom
tkaymak:spark4-runner-slim
Open

[WIP] Add Spark 4 runner#38255
tkaymak wants to merge 26 commits intoapache:masterfrom
tkaymak:spark4-runner-slim

Conversation

@tkaymak
Copy link
Copy Markdown
Contributor

@tkaymak tkaymak commented Apr 21, 2026

Currently work in progress.
Addresses #36841. Replaces #38212.

Builds on the shared base + per-version overrides plumbing introduced in #38233 (merged). With that in place, Spark 4 support is reduced to:

  • 11 genuine Java override files under runners/spark/4/src/ (no duplication of files that match the Spark 3 baseline)
  • Build wiring: runners/spark/4/build.gradle, job-server module, container Dockerfile
  • Small Scala 2.12/2.13 compatibility tweaks to 7 files in the shared base under runners/spark/src/
  • gradle.properties: spark_versions=3,4
  • CI: PreCommit + PostCommit Spark 4 workflows

The diff is smaller compared to #38212, which predated the refactor and duplicated the entire Spark 3 source tree.

cc @Abacn — this is the slim follow-up we discussed.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

tkaymak and others added 15 commits April 21, 2026 07:45
Add spark4_version (4.0.2) to BeamModulePlugin alongside the existing
spark3_version. Update spark_runner.gradle to conditionally select the
correct Scala library (2.13 vs 2.12), Jackson module, Kafka test
dependency, and require Java 17 when building against Spark 4.

Register the new :runners:spark:4 module in settings.gradle.kts.

These changes are purely additive — all conditionals gate on
spark_version.startsWith("4") or spark_scala_version == '2.13', leaving
the Spark 3 build path untouched.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add the Gradle build file for the Spark 4 structured streaming runner.
The module mirrors runners/spark/3/ — it inherits the shared RDD-base
source from runners/spark/src/ via copySourceBase and adds its own
Structured Streaming implementation in src/main/java.

Key differences from the Spark 3 build:
- Uses spark4_version (4.0.2) with Scala 2.13.
- Excludes DStream-based streaming tests (Spark 4 supports only
  structured streaming batch).
- Unconditionally adds --add-opens JVM flags required by Kryo on
  Java 17 (Spark 4's minimum).
- Binds Spark driver to 127.0.0.1 for macOS compatibility.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Add the Spark 4 structured streaming runner implementation and tests.
Most files are adapted from the Spark 3 structured streaming runner
with targeted changes for Spark 4 / Scala 2.13 API compatibility.

Key Spark 4-specific changes (diff against runners/spark/3/src/):

EncoderFactory — Replaced the direct ExpressionEncoder constructor
  (removed in Spark 4) with BeamAgnosticEncoder, a named class
  implementing both AgnosticExpressionPathEncoder (for expression
  delegation via toCatalyst/fromCatalyst) and AgnosticEncoders
  .StructEncoder (so Dataset.select(TypedColumn) creates an N-attribute
  plan, preventing FIELD_NUMBER_MISMATCH). The toCatalyst/fromCatalyst
  methods substitute the provided input expression via transformUp,
  enabling correct nesting inside composite encoders like
  Encoders.tuple().

EncoderHelpers — Added toExpressionEncoder() helper to handle Spark 4
  built-in encoders that are AgnosticEncoder subclasses rather than
  ExpressionEncoder.

GroupByKeyTranslatorBatch — Migrated from internal catalyst Expression
  API (CreateNamedStruct, Literal$) to public Column API (struct(),
  lit(), array()), as required by Spark 4.

BoundedDatasetFactory — Use classic.Dataset$.MODULE$.ofRows() as
  Dataset moved to org.apache.spark.sql.classic in Spark 4.

ScalaInterop — Replace WrappedArray.ofRef (removed in Scala 2.13)
  with JavaConverters.asScalaBuffer().toList() in seqOf().

GroupByKeyHelpers, CombinePerKeyTranslatorBatch — Replace
  TraversableOnce with IterableOnce (Scala 2.13 rename).

SparkStructuredStreamingPipelineResult — Replace sparkproject.guava
  with Beam's vendored Guava.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Add GitHub Actions workflows for the Spark 4 runner module:

- beam_PreCommit_Java_Spark4_Versions: runs sparkVersionsTest on
  changes to runners/spark/**.  Currently a no-op (the sparkVersions
  map is empty) but scaffolds future patch version coverage.
- beam_PostCommit_Java_ValidatesRunner_Spark4StructuredStreaming: runs
  the structured streaming test suite on Java 17.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Remove endOfData() call in close method.
Add job-server and container build configurations for Spark 4,
mirroring the existing Spark 3 job-server setup. The container
uses eclipse-temurin:17 (Spark 4 requires Java 17). The shared
spark_job_server.gradle gains a requireJavaVersion conditional
for Spark 4 parent projects.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The hostname binding hack is no longer needed now that the local
machine resolves its hostname to 127.0.0.1 via /etc/hosts.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Called out in /ultrareview as a missing contributor checklist item.
Adds a Highlight line and a New Features / Improvements entry under
the 2.74.0 Unreleased section, referencing issue apache#36841.
Per /ultrareview feedback: the one-line comment didn't make clear why
the cast is safe. Expand it to note that SparkSession.builder() always
returns a classic.SparkSession at runtime, which is why the downcast
avoids reflection.
Per /ultrareview feedback: the fallback branch silently swallowed the
second ClassNotFoundException. In practice one of the two classes is
always present (Scala 2.12 vs 2.13 stdlib), but a silent skip could
mask a broken classpath. Emit a LOG.warn instead.
Per /ultrareview feedback: the five `"$spark_version" >= "3.5.0"` checks
were lexicographic string comparisons. They happened to work for 3.5.0
and 4.0.2 only because '4' > '3' as chars — a future "3.10.0" release
would compare less than "3.5.0" and silently drop the Spark 3.5+
dependencies and exclusions.

Introduce an `isSparkAtLeast` closure that tokenizes on `.` and `-`,
keeps numeric parts, and compares component-by-component. Replace all
five call sites.
With spark_runner.gradle now layering per-major source overrides on top
of the shared base, runners/spark/4/src/ no longer needs to duplicate
62 byte-identical structured-streaming files. Keep only the 11 files
that actually differ for Spark 4 / Scala 2.13. Switch the build.gradle
to spark_major = '4' (the new mechanism) and bump spark_versions to 3,4.

Compiled output unchanged — the deleted files are reproduced identically
inside build/source-overrides by the Copy task.
@gemini-code-assist
Copy link
Copy Markdown
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces support for Apache Spark 4 in the Beam Spark runner. By leveraging the existing shared base and per-version override plumbing, the implementation remains lightweight and avoids duplicating the Spark 3 source tree. The changes include necessary build system updates, compatibility adjustments for Scala 2.13, and the addition of required CI infrastructure to ensure stability for the new Spark 4 runner.

Highlights

  • Spark 4 Runner Implementation: Added a new Spark 4 runner module (:runners:spark:4) and job-server, supporting batch processing with Spark 4.0.2 and Scala 2.13.
  • Build Infrastructure: Updated BeamModulePlugin and gradle.properties to include Spark 4, and configured build wiring to require Java 17 for Spark 4 compatibility.
  • Shared Code Compatibility: Applied Scala 2.12/2.13 compatibility tweaks to shared Spark runner code to support both Spark 3 and Spark 4.
  • CI/CD Integration: Added new PreCommit and PostCommit CI workflows for Spark 4.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Ignored Files
  • Ignored by pattern: .github/workflows/** (3)
    • .github/workflows/README.md
    • .github/workflows/beam_PostCommit_Java_ValidatesRunner_Spark4StructuredStreaming.yml
    • .github/workflows/beam_PreCommit_Java_Spark4_Versions.yml
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@tkaymak tkaymak mentioned this pull request Apr 21, 2026
3 tasks
@tkaymak tkaymak changed the title [Spark Runner] Add Spark 4 runner (slim, override-only) [Spark Runner] Add Spark 4 runner Apr 21, 2026
@tkaymak tkaymak changed the title [Spark Runner] Add Spark 4 runner [WIP] Add Spark 4 runner Apr 21, 2026
tkaymak added 4 commits April 21, 2026 08:17
scala.Serializable was removed in Scala 2.13. java.io.Serializable
works identically on both Scala 2.12 and 2.13, so this can live in
the shared base instead of needing a Spark-4-only override file.
…base

Wrap Throwables.getRootCause(e).getMessage() in String.valueOf(...)
to make the error logging robust to a null root-cause message. The
behaviour change applies equally to Spark 3 and Spark 4, so the
fix lives in the shared base and the Spark-4 override is dropped.
… PipelineResult

Two changes that previously lived only in the Spark-4 override and
are equally valid for Spark 3:

1. cancel() now actually cancels the executing future
   (pipelineExecution.cancel(true)) in addition to setting the state
   to CANCELLED. Without this, calling cancel() left the pipeline
   running silently — a real bug, not a Spark-4 specific concern.

2. Switch from Spark's shaded guava (org.sparkproject.guava) to the
   Beam-vendored guava that is already on the classpath. Spark 4
   no longer exposes the sparkproject guava package; using the
   vendored one removes the version coupling for both runners.
Empty commit to re-run CI. The only failure on the prior head was
UnboundedScheduledExecutorServiceTest.testThreadsAreAddedOnlyAsNeededWithContention,
a known flake (apache#31590) — the test itself acknowledges
contention-induced extra threads in its inline comment. Squash or
drop on rebase before merge.
@tkaymak
Copy link
Copy Markdown
Contributor Author

tkaymak commented Apr 21, 2026

/gemini review

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces an experimental Spark 4 structured streaming runner for Java, built against Spark 4.0.2 and Scala 2.13, requiring Java 17. The changes include new modules for the Spark 4 runner and job server, updates to shared Spark source for Scala 2.12/2.13 compatibility, and various dependency adjustments. Feedback includes a critical fix for handling multi-windowed data in EncoderHelpers, removing redundant or unchecked casts in BoundedDatasetFactory and GroupByKeyTranslatorBatch, and addressing fragile reflection and implementation details in EncoderFactory.

Comment on lines +568 to +570
public static Instant maxTimestamp(Iterable<BoundedWindow> windows) {
return Iterables.getOnlyElement(windows).maxTimestamp();
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Iterables.getOnlyElement(windows) will throw an IllegalArgumentException if the WindowedValue is associated with multiple windows (e.g., after a sliding window assignment). This will cause the pipeline to crash at runtime. The method should instead return the maximum timestamp across all associated windows to safely handle multi-windowed data.

Suggested change
public static Instant maxTimestamp(Iterable<BoundedWindow> windows) {
return Iterables.getOnlyElement(windows).maxTimestamp();
}
public static Instant maxTimestamp(Iterable<? extends BoundedWindow> windows) {
Instant maxTimestamp = null;
for (BoundedWindow window : windows) {
Instant timestamp = window.maxTimestamp();
if (maxTimestamp == null || timestamp.isAfter(maxTimestamp)) {
maxTimestamp = timestamp;
}
}
return maxTimestamp;
}

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 334dcd3 — fixed in both the shared base (runners/spark/src/.../EncoderHelpers.java) and the Spark 4 override. The method now iterates and returns the max maxTimestamp() across all windows, with a Preconditions.checkNotNull to fail loudly on the (not expected in practice) empty-windows case rather than silently returning null.

try {
PipelineOptions options = params.options.get();
long desiredSize = source.getEstimatedSizeBytes(options) / params.numPartitions;
List<BoundedSource<T>> split = (List<BoundedSource<T>>) source.split(desiredSize, options);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The cast to List<BoundedSource<T>> is unchecked and unnecessary. Since source.split returns List<? extends BoundedSource<T>>, you can use that type directly. This is compatible with the subsequent stream processing and avoids potential heap pollution warnings.

Suggested change
List<BoundedSource<T>> split = (List<BoundedSource<T>>) source.split(desiredSize, options);
List<? extends BoundedSource<T>> split = source.split(desiredSize, options);

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 9f53d5a — fixed in both the shared base and the Spark 4 override. source.split already returns List<? extends BoundedSource<T>>, so the cast was both unchecked and unnecessary.

.groupByKey(fun1(t -> t._1()), windowedKeyEnc)
.mapValues(fun1(t -> t._2()), valueEnc)
.mapGroups(
fun2((wKey, it) -> windowedKV(wKey, iterableOnce((Iterator<V>) it))),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The cast (Iterator<V>) is redundant as the it parameter is already inferred as Iterator<V> from the mapGroups method signature.

Suggested change
fun2((wKey, it) -> windowedKV(wKey, iterableOnce((Iterator<V>) it))),
fun2((wKey, it) -> windowedKV(wKey, iterableOnce(it))),

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 8bebced — dropped the cast (override only; the analogous call in the base translator already does iterableOnce(it) without a cast).

Comment on lines +48 to +49
private static final Constructor<StaticInvoke> STATIC_INVOKE_CONSTRUCTOR =
(Constructor<StaticInvoke>) StaticInvoke.class.getConstructors()[0];
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Relying on getConstructors()[0] to find the StaticInvoke constructor is fragile because the order of constructors is not guaranteed by the JVM. It is safer to look up the constructor by its parameter types or to iterate through all constructors and find the one with the expected number of parameters.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 9c071c5 — replaced getConstructors()[0] with a primaryConstructor(...) helper that picks the public constructor with the most parameters. The downstream switch on getParameterCount() already dispatches on the right argument shape per Spark version, so this just makes the choice deterministic across JVMs / Spark releases. Comment block above the constants explains the intent.

Comment on lines +147 to +149
org$apache$spark$sql$catalyst$encoders$AgnosticEncoders$StructEncoder$_setter_$isStruct_$eq(
boolean v) {
// no-op: isStruct() is implemented directly above
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The use of the Scala-mangled setter name org$apache$spark$sql$catalyst$encoders$AgnosticEncoders$StructEncoder$_setter_$isStruct_$eq is extremely fragile as it depends on internal Scala compiler implementation details for trait field setters. While this is a known workaround when implementing Scala traits with var fields in Java, it should be clearly documented as a dependency on Spark's internal Scala structure.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 9c071c5 — added a Javadoc on the method explaining that this is the synthetic setter the Scala compiler emits for trait val fields (<trait>$_setter_<field>_$eq), why we have to implement it from Java, and what to expect if Spark removes or renames the underlying isStruct field.

tkaymak added 2 commits April 21, 2026 10:19
Iterables.getOnlyElement(windows) crashes with IllegalArgumentException
when a WindowedValue is associated with more than one window (e.g. after
a sliding window assignment). Compute the max maxTimestamp() across all
associated windows instead, falling back to a clear error if the iterable
is unexpectedly empty.

Applied identically to the shared base and the Spark 4 override. Flagged
by Gemini Code Assist on PR apache#38255.
source.split returns List<? extends BoundedSource<T>>, which already
satisfies the subsequent stream usage. The cast was unchecked and would
trip heap-pollution warnings. Applied identically to the shared base
and the Spark 4 override. Flagged by Gemini Code Assist on PR apache#38255.
tkaymak added 2 commits April 21, 2026 10:19
…slatorBatch

The (Iterator<V>) cast inside fun2 is redundant: fun2's signature
infers the iterator type. The shared base translator at the analogous
call site already calls iterableOnce(it) without a cast. Flagged by
Gemini Code Assist on PR apache#38255.
…cument trait setter

Replace getConstructors()[0] (JVM-defined ordering, not stable) with a
helper that picks the widest public constructor. The downstream switch
already dispatches on parameter count to pick the right argument shape
per Spark version, so this just makes the choice deterministic.

Also document the org$apache$spark...$_setter_$isStruct_$eq method —
it is the synthetic setter the Scala compiler emits for trait val fields,
required when implementing AgnosticEncoders.StructEncoder from Java.

Both flagged by Gemini Code Assist on PR apache#38255.
@tkaymak
Copy link
Copy Markdown
Contributor Author

tkaymak commented Apr 21, 2026

/gemini review

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces the experimental Spark 4 structured streaming runner for Java, built against Spark 4.0.2 and Scala 2.13. It adds new modules for the Spark 4 runner and job-server, updates build configurations to support Java 17, and implements necessary translation logic and encoders for Spark 4 compatibility. Additionally, existing Spark runner code was refactored to maintain compatibility across Spark 3 and 4, including updates to Scala interop and Kryo registration. Review feedback identified minor typos in documentation and constant names, as well as a potential logic inconsistency in the reflective invocation of Spark constructors that could lead to fragility across versions.

* TODOs:
* <li>combine with context (CombineFnWithContext)?
* <li>combine with sideInputs?
* <li>other there other missing features?
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Grammar issue in TODO comment: other there other missing features? should be are there other missing features?.

Suggested change
* <li>other there other missing features?
* <li>are there other missing features?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 203392e — fixed in both the shared base and the Spark 4 override (the same TODO block exists in both).

import scala.collection.immutable.List;

/**
* Translator for {@link GroupByKey} using {@link Dataset#groupByKey} with the build-in aggregation
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Typo in Javadoc: build-in should be built-in.

Suggested change
* Translator for {@link GroupByKey} using {@link Dataset#groupByKey} with the build-in aggregation
* Translator for {@link GroupByKey} using {@link Dataset#groupByKey} with the built-in aggregation

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 203392e — fixed in both the shared base and the Spark 4 override.

Comment on lines +295 to +308
switch (STATIC_INVOKE_CONSTRUCTOR.getParameterCount()) {
case 6:
// Spark 3.1.x
return INVOKE_CONSTRUCTOR.newInstance(obj, fun, type, seqOf(args), false, nullable);
case 7:
// Spark 3.2.0
return INVOKE_CONSTRUCTOR.newInstance(
obj, fun, type, seqOf(args), emptyList(), false, nullable);
case 8:
case 9:
// Spark 3.2.x, 3.3.x, 4.0.x: Invoke constructor is 8 params in all these versions
return INVOKE_CONSTRUCTOR.newInstance(
obj, fun, type, seqOf(args), emptyList(), false, nullable, true);
default:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The switch statement here uses STATIC_INVOKE_CONSTRUCTOR.getParameterCount() to determine the Spark version, but then invokes INVOKE_CONSTRUCTOR. This is fragile and inconsistent with the newInstance method below (line 320), which switches on the parameter count of the constructor it actually uses. It's safer and clearer to switch on INVOKE_CONSTRUCTOR.getParameterCount() directly.

      switch (INVOKE_CONSTRUCTOR.getParameterCount()) {
        case 6:
          // Spark 3.1.x
          return INVOKE_CONSTRUCTOR.newInstance(obj, fun, type, seqOf(args), false, nullable);
        case 7:
          // Spark 3.2.0
          return INVOKE_CONSTRUCTOR.newInstance(
              obj, fun, type, seqOf(args), emptyList(), false, nullable);
        case 8:
          // Spark 3.2.x, 3.3.x, 4.0.x
          return INVOKE_CONSTRUCTOR.newInstance(
              obj, fun, type, seqOf(args), emptyList(), false, nullable, true);
        default:
          throw new RuntimeException("Unsupported version of Spark");
      }

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in a438685 — switched on INVOKE_CONSTRUCTOR.getParameterCount() directly, matching the convention used by newInstance(...) further down. The shared base had the same latent inconsistency at line 94, so it's fixed there too. In the Spark 4 override the case 8: case 9: fallthrough collapses back to a single case 8: since INVOKE_CONSTRUCTOR stays at 8 params even where STATIC_INVOKE_CONSTRUCTOR grows to 9.

private static final DataType LIST_TYPE = new ObjectType(List.class);

// Collections / maps of these types can be (de)serialized without (de)serializing each member
private static final Set<Class<?>> PRIMITIV_TYPES =
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Typo in constant name: PRIMITIV_TYPES should be PRIMITIVE_TYPES.

Suggested change
private static final Set<Class<?>> PRIMITIV_TYPES =
private static final Set<Class<?>> PRIMITIVE_TYPES =

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 203392e — renamed to PRIMITIVE_TYPES in both the shared base and the Spark 4 override (along with the call site below).

}

private static <T> boolean isPrimitiveEnc(Encoder<T> enc) {
return PRIMITIV_TYPES.contains(enc.clsTag().runtimeClass());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Update reference to renamed constant PRIMITIVE_TYPES.

Suggested change
return PRIMITIV_TYPES.contains(enc.clsTag().runtimeClass());
return PRIMITIVE_TYPES.contains(enc.clsTag().runtimeClass());

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 203392e — call site updated alongside the constant rename, in both the shared base and the Spark 4 override.

tkaymak added 2 commits April 21, 2026 12:32
Three trivial typos flagged on PR apache#38255 round 2 review, applied
identically to the shared base and the Spark 4 override:

- CombinePerKeyTranslatorBatch: "other there other missing features?"
  -> "are there other missing features?"
- GroupByKeyTranslatorBatch: "build-in" -> "built-in"
- EncoderHelpers: PRIMITIV_TYPES -> PRIMITIVE_TYPES (constant + caller)
In EncoderFactory.invoke(Expression obj, ...), the switch was keyed on
STATIC_INVOKE_CONSTRUCTOR.getParameterCount() but the body actually
calls INVOKE_CONSTRUCTOR. This worked by coincidence: across the
supported Spark 3.x versions both constructors happen to share the
same parameter counts at the same dispatch points. A future Spark
release where the two diverge would silently pick the wrong branch.

Switch on INVOKE_CONSTRUCTOR.getParameterCount() to match the
constructor that is actually invoked, and align with the convention
used by newInstance() further down. In the Spark 4 override this also
lets us collapse the `case 8: case 9:` fallthrough back to a single
`case 8:`, since INVOKE_CONSTRUCTOR remains 8 params in Spark 4 even
though STATIC_INVOKE_CONSTRUCTOR grew to 9.

Applied identically to the shared base and the Spark 4 override.
Flagged by Gemini Code Assist on PR apache#38255.
@tkaymak
Copy link
Copy Markdown
Contributor Author

tkaymak commented Apr 21, 2026

/gemini review

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces an experimental Spark 4 runner for the Java SDK, built against Spark 4.0.2 and Scala 2.13, and requiring Java 17. The changes include the addition of new modules for the Spark 4 runner and its job server, updates to build configurations, and significant refactoring of shared Spark runner code to ensure compatibility across Scala 2.12 and 2.13. Feedback is provided regarding a contradiction in the release notes where the runner is described as a "structured streaming runner" despite currently only supporting batch mode.

Comment thread CHANGES.md Outdated
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
@tkaymak
Copy link
Copy Markdown
Contributor Author

tkaymak commented Apr 21, 2026

/gemini review

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces an experimental Spark 4 runner for the Java SDK, supporting batch processing with Spark 4.0.2 and Scala 2.13 on Java 17. It adds new modules for the Spark 4 runner and job server, while updating shared Spark source code to maintain compatibility with both Spark 3 and Spark 4. Key changes include the implementation of Spark 4-specific translators and encoders, and the migration from JavaConversions to JavaConverters. A redundant unchecked cast was identified in GroupByKeyHelpers.java where v.getWindows() can be used directly.

return v -> {
T value = valueFn.apply(v);
K key = v.getValue().getKey();
Collection<BoundedWindow> windows = (Collection<BoundedWindow>) v.getWindows();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The cast to Collection<BoundedWindow> is redundant and involves an unchecked cast. v.getWindows() returns Collection<? extends BoundedWindow>, which is already an Iterable and can be passed directly to ScalaInterop.scalaIterator().

Suggested change
Collection<BoundedWindow> windows = (Collection<BoundedWindow>) v.getWindows();
return ScalaInterop.scalaIterator(v.getWindows()).map(w -> tuple(tuple(w, key), value));

@tkaymak
Copy link
Copy Markdown
Contributor Author

tkaymak commented Apr 21, 2026

End-to-end smoke test (Spark 4.0.2 / Scala 2.13 / Java 17): Built :runners:spark:4 from 6eea847f5b, pulled it into :examples:java's runtime classpath, and ran WordCount. Pipeline completed cleanly (Batch pipeline execution complete.) and the word counts are correct.

A couple of items worth flagging before merge:

  1. Gemini round 4 nit not yet addressedrunners/spark/4/src/main/java/.../batch/GroupByKeyHelpers.java:83 has a redundant unchecked cast (Collection<BoundedWindow>) v.getWindows(). The identical line is also in the shared base at runners/spark/src/main/java/.../batch/GroupByKeyHelpers.java:83 — per the round-2 pattern, fix in both.

  2. SLF4J↔JUL loop in mixed classpaths: Spark 4 brings jul-to-slf4j; if the consuming module also pulls slf4j-jdk14 you get a StackOverflowError on the first log line. Worth a runner README note (or a runtime exclusion in the module).

@Abacn
Copy link
Copy Markdown
Contributor

Abacn commented Apr 21, 2026

SLF4J↔JUL loop in mixed classpaths: Spark 4 brings jul-to-slf4j; if the consuming module also pulls slf4j-jdk14 you get a StackOverflowError on the first log line. Worth a runner README note (or a runtime exclusion in the module).

I remember this happened for Spark 3 as well: #26985

Yeah we can follow up on documentation or exclude dependencies

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants