Scalable Topics: new typed C++ SDK (pulsar::st) — API definition#598
Merged
Conversation
Header-only public API for the scalable-topics SDK under a new pulsar::st namespace (PIP-460/468/483): client, producers, the three consumer modes, transactions, schemas (reflect-cpp JSON/Avro and protobuf), and the Expected<T>/Future<T> result types, plus examples under examples/st. API definition only -- no lib/st implementation or C API yet. The new API requires C++20; the rest of the client stays C++17. Signed-off-by: Matteo Merli <mmerli@apache.org>
- Apply clang-format-11 to the new pulsar::st headers and examples (the Formatting Check uses clang-format 11; local 18 formats differently). - examples/CMakeLists.txt: build the four dependency-free st samples unconditionally and add the reflect-cpp JSON sample only when reflectcpp is found (find_package CONFIG QUIET instead of REQUIRED), so configure no longer fails where reflect-cpp is absent (e.g. the CodeQL/Analyze job). - vcpkg.json: drop the reflectcpp dependency for now; it returns with the lib/st implementation that actually exercises the JSON/Avro schemas. Signed-off-by: Matteo Merli <mmerli@apache.org>
GCC's -Wmissing-field-initializers (-Wextra, and the build is -Werror) fires
on a partial designated-initializer such as
.deadLetterPolicy({.maxRedeliverCount = 5}) for every omitted member that
lacks a default member initializer. clang does not warn, so this was missed
locally. Give every optional field in the user-facing policy/ack/DLQ structs
an '= std::nullopt' NSDMI so designated-init of any subset is warning-clean.
Verified with gcc:13 -Wextra -Werror against all four st examples.
Signed-off-by: Matteo Merli <mmerli@apache.org>
Addresses PR review feedback on the Schema decode signature. The SerDe seam now takes a std::span<const char> instead of (const char*, size_t), and returns Expected<T> instead of T -- so malformed bytes or an unset schema are error values rather than a non-opt-in throw, consistent with the rest of the API. Message<T>::value() returns Expected<T> accordingly. - built-in numeric codecs report a short payload as ResultInvalidMessage; - the reflect-cpp JSON/Avro SerDes map a parse failure to an Error instead of letting rfl's .value() throw; - the protobuf SerDe now checks ParseFromArray's result; - a custom SerDe may still return a plain T (infallible) -- it converts implicitly to Expected<T>. encode keeps throwing on an unset schema (a configuration error). Examples updated to check the decoded value. Verified with clang + gcc:13 (-Wextra -Werror) and clang-format-11. Signed-off-by: Matteo Merli <mmerli@apache.org>
This reverts commit 46d3f6d.
Per PR review on the Schema encode/decode signatures. SerDe seam (Schema<T> + JSON/Avro/protobuf factories): - encode writes into a caller-provided, reusable std::vector<std::byte>& (no per-message allocation) and returns Expected<void>; - decode takes std::span<const std::byte> and returns Expected<T>, so malformed input / an unset schema are error values rather than throws; - Bytes is now std::vector<std::byte>. Client-facing API unchanged: Message<T>::value() still returns T (decode failures are handled inside the SDK), Producer::send(const T&) and the examples are as before; a rare encode error is stashed in the builder and surfaces from send()/sendAsync(). Zero-copy bytes: new BytesView = std::span<const std::byte>. Schema<BytesView> is the zero-copy counterpart of Schema<Bytes> -- Producer<BytesView> publishes the caller's bytes without copying (the caller keeps them valid until the send completes) and Message<BytesView>::value() returns a view into the message buffer. OutgoingMessage carries an optional non-owning view. Verified with clang + gcc:13 (-Wextra -Werror), clang-format-11, and a runtime check that decode returns a view at the same address. Signed-off-by: Matteo Merli <mmerli@apache.org>
Per PR review: the string accessors return views instead of owning references/copies, so the lib/st impl is not forced to store a std::string per field -- it can return a view into whatever it already holds. - consumer/producer topic() / subscription() / consumerName() / name() and Message::topic() now return std::string_view (Message::topic() previously copied); the detail::*Core declarations they forward to return string_view too. - Message::key() / producerName() / replicatedFrom() now return std::optional<std::string_view>. - Error::message() stays const std::string& (an Error is usually a temporary, so auto-capturing a const ref copies safely whereas a view would dangle). Returned views are valid while the source object (message / consumer / producer) is alive. All within pulsar::st; the old API is untouched. Verified with clang + gcc:13 (-Wextra -Werror), static_asserts on the return types, and clang-format-11. Signed-off-by: Matteo Merli <mmerli@apache.org>
toByteArray() returns std::vector<std::byte> and fromByteArray() takes std::span<const std::byte>, instead of std::string -- byte-correct and consistent with Bytes/BytesView. The round-trip stays implicit: a std::vector<std::byte> from toByteArray() converts to the span parameter. Example updated. All within pulsar::st. Verified with clang + gcc:13 (-Wextra -Werror) and clang-format-11. Signed-off-by: Matteo Merli <mmerli@apache.org>
property(const std::string& k, const std::string& v) becomes property(std::string k, std::string v) with insert_or_assign(std::move(k), std::move(v)), across MessageBuilder, ProducerBuilder, and the three consumer builders -- consistent with the other by-value-sink setters (topic / subscriptionName / etc.). Verified with clang + gcc:13 (-Wextra -Werror) and clang-format-11. Signed-off-by: Matteo Merli <mmerli@apache.org>
await_suspend now returns bool and uses SharedState::addListenerOrReady, which atomically registers the resume continuation or reports the result is already available -- so the coroutine resumes via await_resume instead of being resumed from inside await_suspend (which could run/destroy the awaiter before it returns). Verified with a co_await runtime test on clang + gcc:13. Signed-off-by: Matteo Merli <mmerli@apache.org>
A message whose payload cannot be decoded is handled internally by the SDK and never delivered, so decode is not a receive failure. Dropped it from the receive failure lists on all three consumers and added a clarifying note. Signed-off-by: Matteo Merli <mmerli@apache.org>
Match the existing client's canonical primitive names (lib/Schema.cc: STRING/INT32/INT64/FLOAT/DOUBLE/BYTES); StringCodec/DoubleCodec used mixed-case 'String'/'Double'. The name is sent to the broker. Signed-off-by: Matteo Merli <mmerli@apache.org>
MessageBuilder::value() now clears encodeError_ on success instead of leaving a prior failure sticky, so a later successful value() doesn't surface a stale error at send()/sendAsync(). Signed-off-by: Matteo Merli <mmerli@apache.org>
OutgoingMessage::eventTime and MessageCore::eventTime() are now std::optional<Timestamp> instead of an int64 epoch-ms with a 0=unset sentinel, so an event time of exactly the Unix epoch is no longer indistinguishable from unset. The int64 epoch-ms is just the wire encoding (converted in lib/st); MessageBuilder::eventTime and Message::eventTime() simplify accordingly. Verified epoch != unset at runtime on clang + gcc:13. Signed-off-by: Matteo Merli <mmerli@apache.org>
Parity with Stream/QueueConsumer -- the consumerName config field and builder setter existed, but the getter did not, so the name could be set but not read. Signed-off-by: Matteo Merli <mmerli@apache.org>
…review P3) toByteArray() / fromByteArray() / earliest() / latest() return values that must not be silently discarded. Signed-off-by: Matteo Merli <mmerli@apache.org>
It returns a reference into the message, like the other view-returning getters. Signed-off-by: Matteo Merli <mmerli@apache.org>
…-on) OutgoingMessage::deliverAt is now std::optional<Timestamp> (was int64 epoch-ms with 0=immediate), matching eventTime; deliverAfter/deliverAt set it directly, and the now-unused toEpochMs helper is removed. Verified on clang + gcc:13. Signed-off-by: Matteo Merli <mmerli@apache.org>
MessageCore::publishTime() now returns Timestamp (was int64_t publishTimeMs()); Message::publishTime() forwards it directly. Consistent with the eventTime / deliverAt Timestamp modeling; the int64 epoch-ms is just the wire encoding (converted in lib/st). Signed-off-by: Matteo Merli <mmerli@apache.org>
- P1: rename Producer::name() -> producerName() (+ ProducerCore), consistent with Message::producerName() and the producerName builder setter. - G3: add MessageBuilder::replicationClusters() setter for the previously unreachable OutgoingMessage::replicationClusters field. - Q1: drop the 'ordering key' framing from the message-key docs -- it is a routing / partition key; ordering is provided by the StreamConsumer. Signed-off-by: Matteo Merli <mmerli@apache.org>
FloatCodec (FLOAT, big-endian IEEE-754), Int8Codec (INT8), Int16Codec (INT16), wired into the default Schema<T> ctor; canonical uppercase names match the existing client. Round-trip verified on clang + gcc:13. bool is NOT added: the existing pulsar::SchemaType enum has no BOOLEAN value (Java has it at 5; the C++ port skipped it), and adding it would mean touching the old API. Deferred pending a decision on extending the old enum. Signed-off-by: Matteo Merli <mmerli@apache.org>
MessageCore::data() / Message<T>::data() now return std::span<const std::byte> (BytesView), carrying pointer and length together; the separate size() accessor is removed (use data().size()), and Message<T>::value() simplifies accordingly. Consistent with the Bytes/BytesView byte modeling. Verified on clang + gcc:13. Signed-off-by: Matteo Merli <mmerli@apache.org>
PulsarClientBuilder drops the top-level ioThreads / messageListenerThreads / memoryLimit / listenerName setters. Grouped by scope: - listenerName -> ConnectionPolicy - ioThreads + messageListenerThreads -> new ThreadPolicy - memoryLimit -> new MemoryPolicy with threadPolicy() / memoryPolicy() builder setters. Verified on clang + gcc:13. Signed-off-by: Matteo Merli <mmerli@apache.org>
MessageCore::key() / producerName() / replicatedFrom() now return std::optional<std::string_view> directly instead of a paired hasX() bool + string_view accessor -- the optional carries the present/absent signal. Message<T>'s wrappers collapse to direct forwards. Verified on clang + gcc:13. Signed-off-by: Matteo Merli <mmerli@apache.org>
detail::*Core receiveAsync/receiveMultiAsync now take std::chrono::milliseconds (matching the public receive() signatures), so the public methods forward the typed timeout directly instead of calling .count(). <cstdint> swapped for <chrono> in the cores (int64_t was only the timeout). Verified on clang + gcc:13. Signed-off-by: Matteo Merli <mmerli@apache.org>
Replace the bool hasKey + std::string key pair on OutgoingMessage with a single std::optional<std::string> key, mirroring the read-side MessageCore::key() -> std::optional<std::string_view>. nullopt means no routing key. MessageBuilder::key() now just assigns the optional. Signed-off-by: Matteo Merli <mmerli@apache.org>
Drop the -1 sentinel on OutgoingMessage::sequenceId in favor of std::optional<int64_t>; unset means auto-assign. Avoids a custom in-band encoding of 'no explicit sequence id'. MessageBuilder::sequenceId() just assigns the optional. Signed-off-by: Matteo Merli <mmerli@apache.org>
Drop the -1 sentinel on the read side too: lastSequenceId() now returns std::nullopt when nothing has been published yet, instead of -1. Updates detail::ProducerCore to match. Signed-off-by: Matteo Merli <mmerli@apache.org>
rfl::{json,avro}::write() can throw, which would escape encode()'s
Expected<void> non-throwing contract. Wrap the body in try/catch and
report failures as unexpected(ResultInvalidMessage, ...), mirroring the
existing decode() guard. info() (schema derivation) has no error channel
and stays off the non-throwing path; document that on the factories.
Signed-off-by: Matteo Merli <mmerli@apache.org>
A zero-copy Schema<BytesView> send publishes the viewed bytes directly, so they must outlive the send. The returned future is the only completion signal; discarding it (fire-and-forget) leaves no safe point to free the bytes. Document this on sendAsync(). Signed-off-by: Matteo Merli <mmerli@apache.org>
MessageId could not be used as a key in unordered_map/unordered_set. Add a std::hash<MessageId> specialization (operator() defined in lib/st, consistent with operator==: equal ids hash equal); befriend it so it can read the impl. Give Checkpoint a hidden-friend operator<< mirroring MessageId's, so both opaque position types stream the same way for logging/debugging. Signed-off-by: Matteo Merli <mmerli@apache.org>
PulsarClient, Producer, the three consumers, and Transaction are shared-state handles that must stay cheaply copyable and movable. They relied on implicitly-generated special members, which a later user-declared destructor would silently suppress (turning the move into a copy or deleting it). Declare copy/move = default explicitly on all six to lock in handle value semantics and make the intent visible. Signed-off-by: Matteo Merli <mmerli@apache.org>
thenApply assumed a non-void, copyable mapper: it called setValue(f(...)) (ill-formed when f returns void) and moved f straight into the std::function listener (ill-formed when f is move-only, since std::function requires a copyable target). Branch on the result type with if constexpr - a void mapper runs and then completes the Future<void> via setSuccess() - and hold f in a shared_ptr so the copyable listener can carry a move-only mapper. Verified at runtime (normal, void, move-only, and error-propagation paths) on clang and gcc. Signed-off-by: Matteo Merli <mmerli@apache.org>
A detail::Promise dropped without being completed left its SharedState forever pending, so Future::get() (and listeners / co_await) blocked indefinitely. Add a Guard shared by every copy of a Promise: when the last copy is destroyed it completes the state with an error (ResultUnknownError, "promise abandoned before completion") unless something already fulfilled it. complete() is idempotent, so a normally-completed promise is unaffected, and destroying one copy among several does not trip it. Verified at runtime (single/copied/void abandonment, partial-copy safety, completed no-op) on clang and gcc. Signed-off-by: Matteo Merli <mmerli@apache.org>
AckPolicy::negativeAckRedeliveryDelay only applies to a QueueConsumer. A StreamConsumer acknowledges cumulatively and has no negative-ack path, so the field is silently ignored there. Document that on the StreamConsumer config field and the ackPolicy() setter. Signed-off-by: Matteo Merli <mmerli@apache.org>
The topic-vs-namespace target is a bool + two strings, so the POD config can represent invalid combinations the type system does not prevent - including the default-constructed value (single-topic mode with an empty topic). Document that such states (no target, or missing subscriptionName) are rejected by create()/createAsync() with an Error, and that fields not selected by useNamespace are ignored. (A variant target could make these unrepresentable, but that diverges from the POD-config + designated-init pattern used across the API.) Signed-off-by: Matteo Merli <mmerli@apache.org>
…tConsumerAsync Match the create<Thing>Async naming of its siblings (createProducerAsync) and the CheckpointConsumer type it returns. Internal detail rename; no public API change. Signed-off-by: Matteo Merli <mmerli@apache.org>
value_or, and_then, transform, and or_else were const&-only: they copied the contained value into the continuation, and value_or/and_then/transform would not even compile for a move-only T. Add &&-qualified overloads that move the contained value (and forward the error by move), so a move-only or expensive-to-copy T flows through the chain without a copy. value() and operator* already had ref-qualified overloads. Verified at runtime with a move-only payload (unique_ptr) on clang and gcc, plus an lvalue regression pass. Signed-off-by: Matteo Merli <mmerli@apache.org>
N1 Expected operator*/operator-> are not UB on an error: operator* is
noexcept + std::get so it terminates; operator-> returns nullptr. Correct
the docs to say so.
N2 Drop redundant unit prose ("in milliseconds"/"in seconds") from
std::chrono fields/params in Policies, Consumer (AckPolicy) and the
sendTimeout setter; the type already states the unit. (ProducerConfig's
int64 sendTimeoutMs keeps its "milliseconds" note - it is not a chrono type.)
N3 decodeBigEndian: replace the dead `i < data.size()` guard (all codecs
length-check first) with an assert of that precondition.
N4 ProtobufNativeSchema: guard the size_t->int narrowing in encode/decode,
rejecting messages larger than INT_MAX instead of passing a wrapped size.
N5 OutgoingMessage: one-line note for the usesView<->payloadView invariant.
N7 Wrap the SerDeFor concept in clang-format off/on so clang-format-11 stops
mangling the `{ expr } -> Concept;` compound requirements.
N6 Normalize config-struct field docs to the dominant /** */-before style
(OutgoingMessage, CheckpointConsumerConfig, Stream/QueueConsumerConfig);
enum-value ///< trailing docs are left as-is.
Verified: clang-format-11 clean; examples compile (clang); N3 runtime test
and N4 (protobuf stub) pass on clang and gcc.
Signed-off-by: Matteo Merli <mmerli@apache.org>
pulsar::st) — API definition [draft]pulsar::st) — API definition
| concept SerDeFor = requires(const S& serde, const T& value, std::span<const std::byte> data, | ||
| std::vector<std::byte>& out) { | ||
| { serde.info() } -> std::convertible_to<SchemaInfo>; | ||
| { serde.encode(value, out) } -> std::convertible_to<Expected<void>>; |
Contributor
There was a problem hiding this comment.
Should we return Expected<std::vector<std::byte>> instead of passing a mutable reference to std::vector<std::byte>>?
Contributor
Author
There was a problem hiding this comment.
The reason for passing the vector as an arg, is that you could potentially reuse the same vector multiple times. If we return it, we're always bound to allocate a new one each time.
BewareMyPower
approved these changes
Jul 1, 2026
BewareMyPower
left a comment
Contributor
There was a problem hiding this comment.
Other code LGTM, it's able to merge once the comment is addressed.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What's in this PR
include/pulsar/st/*.h— the full public API, fully Doxygen-documented:PulsarClient+PulsarClientBuilder(+ grouped config inPolicies.h)Producer<T>/MessageBuilder<T>/ProducerBuilder<T>StreamConsumer<T>(ordered, cumulative-ack),QueueConsumer<T>(shared, individual-ack + DLQ),CheckpointConsumer<T>(externally-held position)Transaction,Message<T>/Messages<T>,MessageId,CheckpointSchema<T>+jsonSchema/avroSchema/protobufNativeSchemaExpected<T>(sync results) andFuture<T>(async results)include/pulsar/st/detail/*.h— the internal PIMPL boundary the implementation will fill in (*Coreclasses,SharedState).examples/st/*— usage examples (compile under C++20; not linkable untillib/stlands). The four core samples build dependency-free; the JSON-schema sample builds only where reflect-cpp is present.Design decisions (and why)
Typed facade over a byte-oriented core
Schema<T>,Producer<T>,Message<T>,Consumer<T>are thin header templates over a non-templated, type-erased core (thedetail::*Coreclasses, to be implemented inlib/st). This gives a Java-like typed API while keeping the heavy code non-templated and out of headers — and lets the forthcoming C API bind to the same core.Errors are values (
Expected<T>), not exceptionsSynchronous calls return
Expected<T>— astd::expected-shaped,[[nodiscard]]value-or-Error. It is non-throwing by default;.value()opts into throwingClientException.-fno-exceptionsembedders. The closest analogs — gRPC (grpc::Status), librdkafka (error codes), the DataStax C/C++ driver (CassError) — all use error values on the operational path, not exceptions.[[nodiscard]]closes the one real weakness of value-errors: silently ignoring them.Async is a continuation
Future<T>, notstd::future...Asynccalls return aFuture<T>withaddListener()(react on completion, no blocking),get()/get(timeout)(block on demand),thenApply()(map), andco_await(C++20).std::future? It has no continuation — you can only block on.get(), which defeats the purpose of async in an event-loop app. Every comparable library uses continuations instead: folly (.thenValue), Seastar (.then, which explicitly rejectsstd::future), Asio (completion tokens), gRPC (callbacks), the DataStax driver (cass_future_set_callback).pulsar::sttargets C++20 (the rest of the client stays C++17)Only the new API requires C++20; existing headers and users are untouched (a
detail/Cxx20.hguard emits a friendly error otherwise). This buys: reflect-cpp (zero-boilerplate JSON/Avro), concepts (cleanSchema/SerDe constraints), coroutines (co_awaitonFuture<T>),using enum(unqualified result codes), andoperator<=>.Schema is a type-erased value over a pluggable SerDe
Schema<T>holds a SerDe (info/encode/decode). Built-ins cover primitives; structured types use:protobufNativeSchema<T>()— fully automatic (protobuf reflection; already a dependency),jsonSchema<T>()/avroSchema<T>()— zero boilerplate via reflect-cpp (pass a plain struct — nested structs and containers included),Schema<T>(serde).This replaced an earlier type-keyed specialization model, which couldn't express "the same
Tencoded different ways" (JSON vs Avro).PIMPL via non-templated
*Coreclasses (indetail/)A header-only templated facade can't do the library's usual
.cc-only PIMPL — a template must see a declaration of whatever it calls. So each entity has a non-templated*Core(ProducerCore,StreamConsumerCore, …,ClientCore) holding the hidden*Impl, with methods declared in the header and defined inlib/st; the typed classes are thin wrappers that forward to it, andFuture<T>::thenApplycarries the typed mapping. The cores live inpulsar::st::detailand underdetail/, so they're out of the public namespace and excluded from generated docs.Consumer acks are fire-and-forget
voidacknowledge/acknowledgeCumulative/negativeAcknowledgedon't block and don't return an error — acks are buffered/best-effort and a lost ack just means redelivery (matching the new Java SDK). Transactional acks surface their outcome atTransaction::commit().Client configuration grouped into policy structs
ConnectionPolicy,BackoffPolicy,TlsPolicy,TransactionPolicy,MemorySize(client) andAckPolicy,DeadLetterPolicy(consumer), mirroring the Java v5configpackage and read cleanly via C++20 designated initializers.Smaller calls
blockIfQueueFulldefaults totrue.inNamespace(...)—namespaceis a C++ keyword.PulsarClient's default constructor is private — build only through the builder (matching the existingClient).explicit operator bool()(dropped the redundantvalid()).receive()returnsExpected<Message<T>>because a receive can fail without a message (closed / disconnected / decode error).Open questions for reviewers
jsonSchema<T>()/avroSchema<T>()are built on reflect-cpp, but this API-only PR does not yet add it tovcpkg.json— the JSON sample is gated behindfind_package(reflectcpp), and reflect-cpp will be wired into the manifest with thelib/stimplementation that actually exercises it. The reflect-cpp calls here are verified against a stand-in only. Open question: acceptable as a hard dep of the JSON/Avro schema headers, or should it be optional? (Thereflectcppvcpkg port 0.24.0 also ships no Avro backend yet, soavroSchema<T>()needs one before it can build; JSON works via reflect-cpp's core yyjson.)pulsar::Result(re-exported viausing enum); a scopedenum class st::ErrorCodewould read cleaner but needs a translation layer.inNamespace,ackPolicy, the policy field sets, etc. — open to bikeshedding.Status / verification
-Wall -Werror; examples are syntax-checked;ProtobufNativeSchema.hcompiles against real vcpkg protobuf; the reflect-cpp headers against a stand-inrflAPI.lib/st. Planned follow-ups: thelib/stimplementation (proto commands → DAG-watch session → producer path → consumers → transactions/DLQ), the C API, and wiring the examples into the build.