Skip to content

Scalable Topics: pulsar::st implementation — protocol foundation (DAG watch, layout, routing)#600

Open
merlimat wants to merge 7 commits into
apache:mainfrom
merlimat:scalable-topics-cpp-impl
Open

Scalable Topics: pulsar::st implementation — protocol foundation (DAG watch, layout, routing)#600
merlimat wants to merge 7 commits into
apache:mainfrom
merlimat:scalable-topics-cpp-impl

Conversation

@merlimat

@merlimat merlimat commented Jul 2, 2026

Copy link
Copy Markdown
Contributor

Motivation

This is the first implementation PR for the scalable-topics C++ SDK (pulsar::st), building on the API definition merged in #598. It lays the protocol foundation the producer/consumer implementations will sit on: the wire contract, the build integration for lib/st, the client shell, and the DAG-watch machinery that keeps a scalable topic's segment layout current on the client.

The guiding architecture (per PIP-468): a scalable topic's segments are ordinary persistent topics broker-side, so the classic transport stack (connection pool, executors, lookup) is reused wholesale, and lib/st layers the scalable-topics machinery on top. The layout/routing/session code is a faithful port of the Java v5 client (ClientSegmentLayout / SegmentRouter / ScalableTopicHashing / DagWatchClient), so the two clients behave identically on the wire and route keys identically.

Modifications

One commit per step, in dependency order:

  1. Proto syncproto/PulsarApi.proto updated from apache/pulsar master, picking up the complete scalable-topics protocol: the DAG-watch commands (CommandScalableTopicLookup/Update/Close), segment/DAG messages, consumer-controller commands, topic/TC watch commands, and the supports_scalable_topics feature flag (BaseCommand types 70–81).

  2. Build wiringlib/st compiles as ST_OBJECT_LIB, a separate object library with per-target CXX_STANDARD 20 (the classic client stays C++17), merged into the same libpulsar shared/static artifacts. reflectcpp joins the vcpkg manifest, so jsonSchema<T>() builds against the real reflect-cpp. New pulsar-st-tests gtest binary (no broker needed).

  3. Client shellpulsar::st::ClientImpl wraps a classic pulsar::ClientImpl; PulsarClientBuilder::build() validates the config and maps the st policy structs onto ClientConfiguration; detail::ClientCore methods forward to it; closeAsync() bridges the classic callback onto the continuation Future<void>. Operations whose phase hasn't landed fail cleanly with ResultOperationNotSupported instead of hanging.

  4. Segment layout + routerScalableTopicHashing (one raw murmur3 per key; high 16 bits route segments, low 16 route entry-buckets per PIP-486 — Murmur3_32Hash grows a makeRawHash() mirroring the Java addition), SegmentLayout::fromProto (sorted active segments, sealed segments, per-segment broker addresses, controller address, legacy markers, entry-bucket splits, segment://…/hhhh-hhhh-id topic URIs), and SegmentRouter (range routing; all-legacy synthetic layouts fall back to signSafeMod(classicMurmur3(key), N) for exact v4 partitioned-producer parity; keyless messages round-robin).

  5. Wire plumbingCommands::newScalableTopicLookup/Close factories; supports_scalable_topics advertised on CONNECT; messageType() names for all twelve new command types (it runs on every incoming command and throws on unknown values); ClientConnection gains a DAG-watch session registry keyed by the client-assigned session id, SCALABLE_TOPIC_UPDATE dispatch (unknown sessions drop the push rather than killing the shared connection), close-notification for re-establishment, and now records the broker's CONNECTED feature flags (supportsScalableTopics()).

  6. DagWatchSession — the per-topic session tying it together: connect through the classic pool, gate on broker support, register + send the lookup; updates adopt the broker-resolved topic:// identity and swap the SegmentLayout (listener notified with new/old); reconnect with exponential backoff (100ms–30s) after the initial layout, fail-fast before it; broker errors before the first layout fail start() (TopicNotFound mapped); idempotent close() sends CommandScalableTopicClose.

  7. Avro backend — the registry reflectcpp port ships rfl/avro.hpp but exposes no avro feature, so the backend wasn't compiled and <avro.h> wasn't installed. A vcpkg overlay port (vcpkg-overlay/reflectcpp, wired via vcpkg-configuration.json) adds the feature (REFLECTCPP_AVRO + avro-c); building against the real backend exposed and fixed two API mismatches in AvroSchema.h (to_schema<T>().json_str(), write()std::vector<char>). Verified with a runtime avroSchema<T>() roundtrip. The overlay can be dropped once the upstream port grows the feature.

What works after this PR

PulsarClient::builder().serviceUrl(…).build() returns a live client on the real connection pool; close()/shutdown() work; the DAG-watch session can open, receive layouts, survive reconnects, and close. jsonSchema and avroSchema build and roundtrip against the real reflect-cpp. Producer/consumer creation intentionally returns ResultOperationNotSupported until their phases land.

Next phases (follow-up PRs)

Producer (per-segment fan-out driven by the watch session, keyed routing, st MessageId), then Queue/Stream/Checkpoint consumers, the C API, and transactions.

Verifying this change

  • pulsar-st-tests: 70 broker-free tests covering Expected/Future (incl. coroutine awaiter and abandoned-promise), the Schema codecs, builder validation and client lifecycle, layout parsing/routing (incl. mod-N parity with the classic hash), wire-level command roundtrips, and session lifecycle.
  • Full local builds of pulsarShared/pulsarStatic/pulsar-tests/StExamples (vcpkg, macOS); gcc-13 parity compiles with the repo warning flags; clang-format-11 clean.
  • The full session protocol and routing get their end-to-end validation against a scalable-topics broker in the producer PR.

Documentation

  • doc
  • doc-not-needed

merlimat added 7 commits July 1, 2026 11:39
Bring proto/PulsarApi.proto up to date with apache/pulsar master to pick up
the scalable-topics wire protocol that pulsar::st will implement against: the
DAG watch commands (CommandScalableTopicLookup/Update/Close), the segment/DAG
messages (SegmentInfoProto, SegmentBrokerAddress, ScalableTopicDAG,
SegmentState), the consumer-controller commands (CommandScalableTopicSubscribe
/SubscribeResponse/AssignmentUpdate, ScalableConsumerAssignment), the
topic/TC watch commands, and the supports_scalable_topics connect flag.

Straight upstream sync (+329/-7). The only client-visible field change is the
PIP-473 rename txn_ttl_seconds -> txn_ttl_millis (same field number, unused by
the C++ client). libpulsar rebuilds and links cleanly against the regenerated
sources.

Signed-off-by: Matteo Merli <mmerli@apache.org>
Wire the pulsar::st implementation into the build:

- lib/st compiles as ST_OBJECT_LIB, a separate object library with a
  per-target CXX_STANDARD 20 (the classic client stays C++17), merged into
  the same libpulsar shared and static artifacts (and into the MSVC
  pulsarStaticWithDeps fat lib; the non-MSVC fat lib merges pulsarStatic and
  inherits it). Seeded with the out-of-line MessageId/Checkpoint default
  constructors - final code, and exported symbols that prove the merge.

- reflectcpp joins vcpkg.json (0.24.0), making jsonSchema<T>() buildable
  against the real reflect-cpp instead of a stand-in. The vcpkg port also
  ships rfl/avro.hpp, so the avroSchema<T>() __has_include gate lights up
  too. StExamples now compiles SampleStJsonSchema.cc against it.

- tests/st: new pulsar-st-tests gtest binary (C++20, links pulsarStatic, no
  broker needed) with 42 tests covering Expected (value/error, throwing
  value(), monadic ops incl. the && overloads with move-only T), Future
  (get/timed get/listeners, thenApply incl. void and move-only mappers,
  abandoned-promise failure, first-writer-wins, co_await ready/suspend/error)
  and the Schema codecs (primitive roundtrips, big-endian wire format, short
  payload errors, BytesView zero-copy, unset schema, custom SerDe), plus a
  handle test that links the lib/st symbols out of the merged archive.

Verified: full local build of pulsarShared/pulsarStatic/pulsar-st-tests/
StExamples (vcpkg, macOS); 42/42 tests pass; pulsar::st symbols present in
both artifacts; gcc-13 -Wextra -Werror parity on lib/st; clang-format-11
clean.

Signed-off-by: Matteo Merli <mmerli@apache.org>
…tCore

First slice of lib/st: the client comes to life.

- pulsar::st::ClientImpl wraps a classic pulsar::ClientImpl, reusing its
  connection pool, executor pools, lookup service and memory limiting
  wholesale (a scalable topic's segments are ordinary persistent topics
  broker-side, so the transport stack is unchanged). The scalable-topics
  machinery (DAG watch, routing, controller sessions) layers on top in later
  phases; until each lands, the corresponding create/subscribe/transaction
  call fails cleanly with ResultOperationNotSupported instead of hanging.

- PulsarClientBuilder::build() validates the config (serviceUrl required;
  tlsPolicy.enabled requires a pulsar+ssl:// or https:// URL) and maps the
  st policy structs onto the classic ClientConfiguration. st TLS defaults
  are authoritative (validateHostname defaults to true, stricter than the
  classic default). connection.maxConnectionIdleTime has no classic
  counterpart yet (TODO). Construction failures surface as an Error, not an
  exception.

- detail::ClientCore methods defined, forwarding to the st ClientImpl.
  closeAsync() bridges the classic CloseCallback onto the continuation
  Future<void>; shutdown() tears down immediately.

- tests/st/StClientTest.cc: 8 broker-free tests covering builder validation,
  policy mapping, close/shutdown lifecycle, and the typed builder path
  (newProducer().create() flows through ProducerBuilder -> ClientCore -> st
  ClientImpl and back out as a typed Expected error).

Verified: 50/50 pulsar-st-tests green (macOS/clang, vcpkg build);
clang-format-11 clean.

Signed-off-by: Matteo Merli <mmerli@apache.org>
The client-side model of a scalable topic's segment layout and the key ->
segment router, ported from the Java v5 client (ClientSegmentLayout /
SegmentRouter / ScalableTopicHashing) so the two clients route identically.

- ScalableTopicHashing: one raw (unmasked) 32-bit Murmur3 hash per key,
  split into two independent 16-bit halves - high half routes segments, low
  half routes entry-buckets (PIP-486). Murmur3_32Hash grows a makeRawHash()
  accessor (the classic makeHash clears bit 31, which would confine the high
  half to [0, 0x7FFF]), mirroring the same addition on the Java class.

- SegmentLayout::fromProto builds an immutable per-epoch view from the
  broker's ScalableTopicDAG: active segments sorted by hash-range start,
  sealed segments, per-segment broker addresses (plain + TLS), controller
  address, legacy-segment markers and entry-bucket splits. Segment topics
  are computed as segment://tenant/ns/topic/<hhhh-hhhh-id>, matching
  SegmentTopicName.formatDescriptor. Malformed DAGs (bad scheme,
  out-of-bounds ranges) surface as Errors.

- SegmentRouter: keyed messages route by the 16-bit segment hash to the
  active segment whose inclusive range contains it; when EVERY active
  segment is legacy (synthetic layout for a not-yet-migrated regular topic)
  routing switches to signSafeMod(classicMurmur3(key), N) over segment_id,
  matching classic partitioned-topic producers exactly; keyless messages
  round-robin. No-active-segments and uncovered-hash conditions are typed
  Errors, not exceptions.

- tests/st/SegmentLayoutTest.cc: 14 tests porting the Java test semantics -
  hash split/unmasked-raw properties, layout parse (sorting, URIs, brokers,
  controller, legacy, splits), malformed-DAG rejection, keyed/deterministic/
  single-segment/uncovered/empty routing, all-legacy mod-N parity against
  the classic hash, mixed-layout range routing, and round-robin coverage.

Verified: 64/64 pulsar-st-tests green; pulsarShared still links; gcc-13
parity with the repo warning flags; clang-format-11 clean.

Signed-off-by: Matteo Merli <mmerli@apache.org>
Teach the shared transport layer the scalable-topics session protocol; the
lib/st DagWatchSession builds on these hooks next.

- Commands: newScalableTopicLookup(sessionId, topic, createIfMissing) and
  newScalableTopicClose(sessionId) factories. newConnect() now advertises
  supports_scalable_topics - safe on shared connections, since the broker
  only sends scalable commands on sessions the client explicitly opens.
  messageType() gains names for all twelve scalable command types; it runs
  on every incoming command and throws on unknown enum values, so this is
  required before the client can receive any scalable push.

- ClientConnection: a DAG-watch session registry keyed by the
  client-assigned session id. registerScalableTopicSession() refuses (returns
  false) on an already-closed connection; SCALABLE_TOPIC_UPDATE dispatches to
  the registered listener by session_id (an unknown session logs and drops
  the push - it may race a just-closed session - rather than treating it as
  a protocol violation, which would kill the shared connection); close()
  notifies every registered session with (error, nullptr) so it can
  re-establish on a new connection.

- tests/st/ScalableCommandsTest.cc: frame-level roundtrips of both factories
  (unwrap [totalSize][cmdSize][BaseCommand], reparse, verify fields) and
  messageType() coverage of all twelve new types.

Verified: 68/68 pulsar-st-tests green; pulsarShared/pulsarStatic and the
classic pulsar-tests target compile clean; clang-format-11 clean.

Signed-off-by: Matteo Merli <mmerli@apache.org>
The lib/st session that keeps a scalable topic's SegmentLayout current,
ported from the Java v5 client's DagWatchClient so the two clients behave
identically.

- start() acquires a connection through the classic client's pool (the
  topic:// spelling maps to its persistent:// twin for the connection
  lookup only; the wire lookup keeps the original), verifies the broker
  advertised scalable-topics support, registers the session on the
  connection and sends CommandScalableTopicLookup. The returned future
  completes with the initial layout or the lookup failure.

- Updates (initial response and pushes alike) arrive through the connection
  registry: the resolved topic:// identity from the broker is adopted, the
  DAG becomes a new SegmentLayout, the reconnect backoff resets, and the
  layout-change listener fires with (new, old). A malformed first response
  fails start() instead of hanging it; later malformed pushes keep the last
  good layout. Broker errors before the first layout fail start()
  (TopicNotFound mapped to ResultTopicNotFound); afterwards they are treated
  as transient, matching the Java client.

- On connection close after the initial layout the session reconnects with
  exponential backoff (100ms..30s) and re-issues the lookup; before the
  initial layout it fails start() rather than retrying behind the caller.
  Deliberately no epoch gating, same as the Java client: updates are ordered
  per connection and the old connection's registry is drained before the
  session re-attaches, so stale pushes cannot arrive.

- close() is idempotent: cancels the reconnect timer, unregisters, and sends
  CommandScalableTopicClose.

- ClientConnection now records the broker's CONNECTED feature flags and
  exposes supportsScalableTopics(), which the session gates on (previously
  the C++ client ignored broker feature flags entirely).

- tests/st/DagWatchSessionTest.cc: the lookup-name mapping and the
  no-connection lifecycle (fresh state, unique ids, close before start,
  double close). The full protocol is exercised end-to-end by the producer
  integration tests against a scalable-topics broker.

Verified: 70/70 pulsar-st-tests green; pulsarShared and the classic
pulsar-tests target compile clean; clang-format-11 clean.

Signed-off-by: Matteo Merli <mmerli@apache.org>
Review feedback on the vcpkg manifest: also include Avro support for
reflectcpp.

The registry reflectcpp port ships the rfl/avro.hpp headers but has no
"avro" feature, so libreflectcpp.a was built without the backend and the
headers could not even compile (they include <avro.h>, which was not
installed). Add a vcpkg overlay port (vcpkg-overlay/reflectcpp, wired via
vcpkg-configuration.json) that adds the feature - REFLECTCPP_AVRO plus an
avro-c (1.12.1) dependency - and turn it on in the manifest. Drop the
overlay once the upstream port grows the feature.

Building against the real backend (instead of the earlier structural
stand-in) exposed two API mismatches in AvroSchema.h, now fixed:
rfl::avro::to_schema<T>() returns a Schema<T> (use .json_str() for the
SchemaInfo definition, a proper Avro record schema), and
rfl::avro::write() returns std::vector<char>, not std::string. decode()
now uses the pointer+size read overload instead of copying into a string.

Verified with a runtime roundtrip through avroSchema<Order>() against the
rebuilt reflectcpp[avro] + avro-c: schema derivation, binary
encode/decode, and malformed-input-as-Error all pass; 70/70
pulsar-st-tests and the StExamples/pulsarShared builds stay green;
clang-format-11 clean.

Signed-off-by: Matteo Merli <mmerli@apache.org>
@merlimat merlimat marked this pull request as ready for review July 2, 2026 00:49
@merlimat merlimat requested review from BewareMyPower and shibd July 2, 2026 00:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant