Scalable Topics: pulsar::st implementation — protocol foundation (DAG watch, layout, routing)#600
Open
merlimat wants to merge 7 commits into
Open
Scalable Topics: pulsar::st implementation — protocol foundation (DAG watch, layout, routing)#600merlimat wants to merge 7 commits into
pulsar::st implementation — protocol foundation (DAG watch, layout, routing)#600merlimat wants to merge 7 commits into
Conversation
Bring proto/PulsarApi.proto up to date with apache/pulsar master to pick up the scalable-topics wire protocol that pulsar::st will implement against: the DAG watch commands (CommandScalableTopicLookup/Update/Close), the segment/DAG messages (SegmentInfoProto, SegmentBrokerAddress, ScalableTopicDAG, SegmentState), the consumer-controller commands (CommandScalableTopicSubscribe /SubscribeResponse/AssignmentUpdate, ScalableConsumerAssignment), the topic/TC watch commands, and the supports_scalable_topics connect flag. Straight upstream sync (+329/-7). The only client-visible field change is the PIP-473 rename txn_ttl_seconds -> txn_ttl_millis (same field number, unused by the C++ client). libpulsar rebuilds and links cleanly against the regenerated sources. Signed-off-by: Matteo Merli <mmerli@apache.org>
Wire the pulsar::st implementation into the build: - lib/st compiles as ST_OBJECT_LIB, a separate object library with a per-target CXX_STANDARD 20 (the classic client stays C++17), merged into the same libpulsar shared and static artifacts (and into the MSVC pulsarStaticWithDeps fat lib; the non-MSVC fat lib merges pulsarStatic and inherits it). Seeded with the out-of-line MessageId/Checkpoint default constructors - final code, and exported symbols that prove the merge. - reflectcpp joins vcpkg.json (0.24.0), making jsonSchema<T>() buildable against the real reflect-cpp instead of a stand-in. The vcpkg port also ships rfl/avro.hpp, so the avroSchema<T>() __has_include gate lights up too. StExamples now compiles SampleStJsonSchema.cc against it. - tests/st: new pulsar-st-tests gtest binary (C++20, links pulsarStatic, no broker needed) with 42 tests covering Expected (value/error, throwing value(), monadic ops incl. the && overloads with move-only T), Future (get/timed get/listeners, thenApply incl. void and move-only mappers, abandoned-promise failure, first-writer-wins, co_await ready/suspend/error) and the Schema codecs (primitive roundtrips, big-endian wire format, short payload errors, BytesView zero-copy, unset schema, custom SerDe), plus a handle test that links the lib/st symbols out of the merged archive. Verified: full local build of pulsarShared/pulsarStatic/pulsar-st-tests/ StExamples (vcpkg, macOS); 42/42 tests pass; pulsar::st symbols present in both artifacts; gcc-13 -Wextra -Werror parity on lib/st; clang-format-11 clean. Signed-off-by: Matteo Merli <mmerli@apache.org>
…tCore First slice of lib/st: the client comes to life. - pulsar::st::ClientImpl wraps a classic pulsar::ClientImpl, reusing its connection pool, executor pools, lookup service and memory limiting wholesale (a scalable topic's segments are ordinary persistent topics broker-side, so the transport stack is unchanged). The scalable-topics machinery (DAG watch, routing, controller sessions) layers on top in later phases; until each lands, the corresponding create/subscribe/transaction call fails cleanly with ResultOperationNotSupported instead of hanging. - PulsarClientBuilder::build() validates the config (serviceUrl required; tlsPolicy.enabled requires a pulsar+ssl:// or https:// URL) and maps the st policy structs onto the classic ClientConfiguration. st TLS defaults are authoritative (validateHostname defaults to true, stricter than the classic default). connection.maxConnectionIdleTime has no classic counterpart yet (TODO). Construction failures surface as an Error, not an exception. - detail::ClientCore methods defined, forwarding to the st ClientImpl. closeAsync() bridges the classic CloseCallback onto the continuation Future<void>; shutdown() tears down immediately. - tests/st/StClientTest.cc: 8 broker-free tests covering builder validation, policy mapping, close/shutdown lifecycle, and the typed builder path (newProducer().create() flows through ProducerBuilder -> ClientCore -> st ClientImpl and back out as a typed Expected error). Verified: 50/50 pulsar-st-tests green (macOS/clang, vcpkg build); clang-format-11 clean. Signed-off-by: Matteo Merli <mmerli@apache.org>
The client-side model of a scalable topic's segment layout and the key -> segment router, ported from the Java v5 client (ClientSegmentLayout / SegmentRouter / ScalableTopicHashing) so the two clients route identically. - ScalableTopicHashing: one raw (unmasked) 32-bit Murmur3 hash per key, split into two independent 16-bit halves - high half routes segments, low half routes entry-buckets (PIP-486). Murmur3_32Hash grows a makeRawHash() accessor (the classic makeHash clears bit 31, which would confine the high half to [0, 0x7FFF]), mirroring the same addition on the Java class. - SegmentLayout::fromProto builds an immutable per-epoch view from the broker's ScalableTopicDAG: active segments sorted by hash-range start, sealed segments, per-segment broker addresses (plain + TLS), controller address, legacy-segment markers and entry-bucket splits. Segment topics are computed as segment://tenant/ns/topic/<hhhh-hhhh-id>, matching SegmentTopicName.formatDescriptor. Malformed DAGs (bad scheme, out-of-bounds ranges) surface as Errors. - SegmentRouter: keyed messages route by the 16-bit segment hash to the active segment whose inclusive range contains it; when EVERY active segment is legacy (synthetic layout for a not-yet-migrated regular topic) routing switches to signSafeMod(classicMurmur3(key), N) over segment_id, matching classic partitioned-topic producers exactly; keyless messages round-robin. No-active-segments and uncovered-hash conditions are typed Errors, not exceptions. - tests/st/SegmentLayoutTest.cc: 14 tests porting the Java test semantics - hash split/unmasked-raw properties, layout parse (sorting, URIs, brokers, controller, legacy, splits), malformed-DAG rejection, keyed/deterministic/ single-segment/uncovered/empty routing, all-legacy mod-N parity against the classic hash, mixed-layout range routing, and round-robin coverage. Verified: 64/64 pulsar-st-tests green; pulsarShared still links; gcc-13 parity with the repo warning flags; clang-format-11 clean. Signed-off-by: Matteo Merli <mmerli@apache.org>
Teach the shared transport layer the scalable-topics session protocol; the lib/st DagWatchSession builds on these hooks next. - Commands: newScalableTopicLookup(sessionId, topic, createIfMissing) and newScalableTopicClose(sessionId) factories. newConnect() now advertises supports_scalable_topics - safe on shared connections, since the broker only sends scalable commands on sessions the client explicitly opens. messageType() gains names for all twelve scalable command types; it runs on every incoming command and throws on unknown enum values, so this is required before the client can receive any scalable push. - ClientConnection: a DAG-watch session registry keyed by the client-assigned session id. registerScalableTopicSession() refuses (returns false) on an already-closed connection; SCALABLE_TOPIC_UPDATE dispatches to the registered listener by session_id (an unknown session logs and drops the push - it may race a just-closed session - rather than treating it as a protocol violation, which would kill the shared connection); close() notifies every registered session with (error, nullptr) so it can re-establish on a new connection. - tests/st/ScalableCommandsTest.cc: frame-level roundtrips of both factories (unwrap [totalSize][cmdSize][BaseCommand], reparse, verify fields) and messageType() coverage of all twelve new types. Verified: 68/68 pulsar-st-tests green; pulsarShared/pulsarStatic and the classic pulsar-tests target compile clean; clang-format-11 clean. Signed-off-by: Matteo Merli <mmerli@apache.org>
The lib/st session that keeps a scalable topic's SegmentLayout current, ported from the Java v5 client's DagWatchClient so the two clients behave identically. - start() acquires a connection through the classic client's pool (the topic:// spelling maps to its persistent:// twin for the connection lookup only; the wire lookup keeps the original), verifies the broker advertised scalable-topics support, registers the session on the connection and sends CommandScalableTopicLookup. The returned future completes with the initial layout or the lookup failure. - Updates (initial response and pushes alike) arrive through the connection registry: the resolved topic:// identity from the broker is adopted, the DAG becomes a new SegmentLayout, the reconnect backoff resets, and the layout-change listener fires with (new, old). A malformed first response fails start() instead of hanging it; later malformed pushes keep the last good layout. Broker errors before the first layout fail start() (TopicNotFound mapped to ResultTopicNotFound); afterwards they are treated as transient, matching the Java client. - On connection close after the initial layout the session reconnects with exponential backoff (100ms..30s) and re-issues the lookup; before the initial layout it fails start() rather than retrying behind the caller. Deliberately no epoch gating, same as the Java client: updates are ordered per connection and the old connection's registry is drained before the session re-attaches, so stale pushes cannot arrive. - close() is idempotent: cancels the reconnect timer, unregisters, and sends CommandScalableTopicClose. - ClientConnection now records the broker's CONNECTED feature flags and exposes supportsScalableTopics(), which the session gates on (previously the C++ client ignored broker feature flags entirely). - tests/st/DagWatchSessionTest.cc: the lookup-name mapping and the no-connection lifecycle (fresh state, unique ids, close before start, double close). The full protocol is exercised end-to-end by the producer integration tests against a scalable-topics broker. Verified: 70/70 pulsar-st-tests green; pulsarShared and the classic pulsar-tests target compile clean; clang-format-11 clean. Signed-off-by: Matteo Merli <mmerli@apache.org>
Review feedback on the vcpkg manifest: also include Avro support for reflectcpp. The registry reflectcpp port ships the rfl/avro.hpp headers but has no "avro" feature, so libreflectcpp.a was built without the backend and the headers could not even compile (they include <avro.h>, which was not installed). Add a vcpkg overlay port (vcpkg-overlay/reflectcpp, wired via vcpkg-configuration.json) that adds the feature - REFLECTCPP_AVRO plus an avro-c (1.12.1) dependency - and turn it on in the manifest. Drop the overlay once the upstream port grows the feature. Building against the real backend (instead of the earlier structural stand-in) exposed two API mismatches in AvroSchema.h, now fixed: rfl::avro::to_schema<T>() returns a Schema<T> (use .json_str() for the SchemaInfo definition, a proper Avro record schema), and rfl::avro::write() returns std::vector<char>, not std::string. decode() now uses the pointer+size read overload instead of copying into a string. Verified with a runtime roundtrip through avroSchema<Order>() against the rebuilt reflectcpp[avro] + avro-c: schema derivation, binary encode/decode, and malformed-input-as-Error all pass; 70/70 pulsar-st-tests and the StExamples/pulsarShared builds stay green; clang-format-11 clean. Signed-off-by: Matteo Merli <mmerli@apache.org>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Motivation
This is the first implementation PR for the scalable-topics C++ SDK (
pulsar::st), building on the API definition merged in #598. It lays the protocol foundation the producer/consumer implementations will sit on: the wire contract, the build integration forlib/st, the client shell, and the DAG-watch machinery that keeps a scalable topic's segment layout current on the client.The guiding architecture (per PIP-468): a scalable topic's segments are ordinary persistent topics broker-side, so the classic transport stack (connection pool, executors, lookup) is reused wholesale, and
lib/stlayers the scalable-topics machinery on top. The layout/routing/session code is a faithful port of the Java v5 client (ClientSegmentLayout/SegmentRouter/ScalableTopicHashing/DagWatchClient), so the two clients behave identically on the wire and route keys identically.Modifications
One commit per step, in dependency order:
Proto sync —
proto/PulsarApi.protoupdated from apache/pulsar master, picking up the complete scalable-topics protocol: the DAG-watch commands (CommandScalableTopicLookup/Update/Close), segment/DAG messages, consumer-controller commands, topic/TC watch commands, and thesupports_scalable_topicsfeature flag (BaseCommand types 70–81).Build wiring —
lib/stcompiles asST_OBJECT_LIB, a separate object library with per-targetCXX_STANDARD 20(the classic client stays C++17), merged into the samelibpulsarshared/static artifacts.reflectcppjoins the vcpkg manifest, sojsonSchema<T>()builds against the real reflect-cpp. Newpulsar-st-testsgtest binary (no broker needed).Client shell —
pulsar::st::ClientImplwraps a classicpulsar::ClientImpl;PulsarClientBuilder::build()validates the config and maps the st policy structs ontoClientConfiguration;detail::ClientCoremethods forward to it;closeAsync()bridges the classic callback onto the continuationFuture<void>. Operations whose phase hasn't landed fail cleanly withResultOperationNotSupportedinstead of hanging.Segment layout + router —
ScalableTopicHashing(one raw murmur3 per key; high 16 bits route segments, low 16 route entry-buckets per PIP-486 —Murmur3_32Hashgrows amakeRawHash()mirroring the Java addition),SegmentLayout::fromProto(sorted active segments, sealed segments, per-segment broker addresses, controller address, legacy markers, entry-bucket splits,segment://…/hhhh-hhhh-idtopic URIs), andSegmentRouter(range routing; all-legacy synthetic layouts fall back tosignSafeMod(classicMurmur3(key), N)for exact v4 partitioned-producer parity; keyless messages round-robin).Wire plumbing —
Commands::newScalableTopicLookup/Closefactories;supports_scalable_topicsadvertised on CONNECT;messageType()names for all twelve new command types (it runs on every incoming command and throws on unknown values);ClientConnectiongains a DAG-watch session registry keyed by the client-assigned session id,SCALABLE_TOPIC_UPDATEdispatch (unknown sessions drop the push rather than killing the shared connection), close-notification for re-establishment, and now records the broker's CONNECTED feature flags (supportsScalableTopics()).DagWatchSession— the per-topic session tying it together: connect through the classic pool, gate on broker support, register + send the lookup; updates adopt the broker-resolvedtopic://identity and swap theSegmentLayout(listener notified with new/old); reconnect with exponential backoff (100ms–30s) after the initial layout, fail-fast before it; broker errors before the first layout failstart()(TopicNotFoundmapped); idempotentclose()sendsCommandScalableTopicClose.Avro backend — the registry reflectcpp port ships
rfl/avro.hppbut exposes noavrofeature, so the backend wasn't compiled and<avro.h>wasn't installed. A vcpkg overlay port (vcpkg-overlay/reflectcpp, wired viavcpkg-configuration.json) adds the feature (REFLECTCPP_AVRO+avro-c); building against the real backend exposed and fixed two API mismatches inAvroSchema.h(to_schema<T>().json_str(),write()→std::vector<char>). Verified with a runtimeavroSchema<T>()roundtrip. The overlay can be dropped once the upstream port grows the feature.What works after this PR
PulsarClient::builder().serviceUrl(…).build()returns a live client on the real connection pool;close()/shutdown()work; the DAG-watch session can open, receive layouts, survive reconnects, and close.jsonSchemaandavroSchemabuild and roundtrip against the real reflect-cpp. Producer/consumer creation intentionally returnsResultOperationNotSupporteduntil their phases land.Next phases (follow-up PRs)
Producer (per-segment fan-out driven by the watch session, keyed routing, st
MessageId), then Queue/Stream/Checkpoint consumers, the C API, and transactions.Verifying this change
pulsar-st-tests: 70 broker-free tests coveringExpected/Future(incl. coroutine awaiter and abandoned-promise), the Schema codecs, builder validation and client lifecycle, layout parsing/routing (incl. mod-N parity with the classic hash), wire-level command roundtrips, and session lifecycle.pulsarShared/pulsarStatic/pulsar-tests/StExamples(vcpkg, macOS); gcc-13 parity compiles with the repo warning flags; clang-format-11 clean.Documentation
docdoc-not-needed