Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
e08e1b3
Scalable Topics: typed C++ SDK public API (pulsar::st)
merlimat Jun 23, 2026
fa4def3
Fix CI: clang-format the st sources; make reflect-cpp optional
merlimat Jun 23, 2026
afda557
Fix CI: give st config-struct fields default member initializers
merlimat Jun 23, 2026
399b791
Merge remote-tracking branch 'apache/main' into scalable-topics-cpp-api
merlimat Jun 29, 2026
46d3f6d
st: decode returns Expected<T> and takes std::span<const char>
merlimat Jun 30, 2026
967a09d
Revert "st: decode returns Expected<T> and takes std::span<const char>"
merlimat Jun 30, 2026
e470720
st: byte-buffer SerDe seam + zero-copy BytesView
merlimat Jun 30, 2026
ac616bb
st: return std::string_view from string accessors
merlimat Jun 30, 2026
901b9eb
st: serialize MessageId/Checkpoint as bytes, not std::string
merlimat Jun 30, 2026
9dffe5a
st: make property() a by-value sink (review item K1)
merlimat Jun 30, 2026
4853693
st: fix await_suspend coroutine resume race (review B1)
merlimat Jun 30, 2026
c36448e
st: receive() does not surface decode errors -- doc fix (review B2)
merlimat Jun 30, 2026
7c6043e
st: uppercase primitive schema names STRING/DOUBLE (review B4)
merlimat Jun 30, 2026
d2c91db
st: reset encodeError_ on a successful encode (review B5)
merlimat Jun 30, 2026
d3e914d
st: model event time as std::optional<Timestamp> (review B3)
merlimat Jun 30, 2026
b234d8e
st: add CheckpointConsumer::consumerName() (review G2)
merlimat Jun 30, 2026
c65e6df
st: [[nodiscard]] on MessageId/Checkpoint serialization + sentinels (…
merlimat Jun 30, 2026
f7affbc
st: document Message::properties() view lifetime (review P4)
merlimat Jun 30, 2026
3730ff0
st: model deliverAt as std::optional<Timestamp> too (review B3 follow…
merlimat Jun 30, 2026
db9607b
st: model publishTime as Timestamp, not int64 epoch-ms
merlimat Jun 30, 2026
ebb7268
st: producer review items P1, G3, Q1
merlimat Jun 30, 2026
db0f940
st: add float/int8/int16 primitive codecs (review G4/Q4)
merlimat Jun 30, 2026
4013ba6
st: Message::data() returns BytesView, not const char* + size()
merlimat Jun 30, 2026
85114a4
st: group loose client settings into policies by scope (review Q3)
merlimat Jun 30, 2026
e497536
st: MessageCore optional accessors, drop hasX() bools
merlimat Jun 30, 2026
39a305f
st: receive cores take std::chrono::milliseconds, not int64_t timeoutMs
merlimat Jun 30, 2026
d17d94a
st: OutgoingMessage key -> std::optional<std::string>
merlimat Jun 30, 2026
cf6ed88
st: OutgoingMessage sequenceId -> std::optional<int64_t>
merlimat Jun 30, 2026
c3ac06d
st: Producer::lastSequenceId() -> std::optional<int64_t>
merlimat Jun 30, 2026
14c1e92
st: P2 - guard rfl encode() against throwing
merlimat Jun 30, 2026
5808898
st: P5 - warn about fire-and-forget + BytesView dangling
merlimat Jun 30, 2026
0ab60f7
st: P6 - std::hash<MessageId> + Checkpoint operator<<
merlimat Jun 30, 2026
00af737
st: P7 - explicitly default copy/move on handles
merlimat Jun 30, 2026
7d2dd84
st: P8 - thenApply supports void-returning and move-only mappers
merlimat Jun 30, 2026
22b480f
st: P9 - fail the future when a Promise is abandoned
merlimat Jun 30, 2026
7d972de
st: P10 - note negativeAckRedeliveryDelay is inert on StreamConsumer
merlimat Jun 30, 2026
d44f518
st: P11 - document the invalid/rejected default consumer target
merlimat Jun 30, 2026
9999a4a
st: P12 - rename ClientCore::createCheckpointAsync -> createCheckpoin…
merlimat Jun 30, 2026
db63962
st: P13 - rvalue overloads for Expected monadic ops and value_or
merlimat Jun 30, 2026
8264bcf
st: review nits N1-N7
merlimat Jun 30, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -105,3 +105,42 @@ target_link_libraries(SampleReaderCApi ${CLIENT_LIBS} pulsarShar
target_link_libraries(SampleKeyValueSchemaConsumer ${CLIENT_LIBS} pulsarShared)
target_link_libraries(SampleKeyValueSchemaProducer ${CLIENT_LIBS} pulsarShared)
target_link_libraries(SampleCustomLoggerCApi ${CLIENT_LIBS} pulsarShared)

# --- Scalable topics (pulsar::st) examples ---------------------------------
# These use the new typed scalable-topics API under include/pulsar/st. Its
# implementation (lib/st) does not exist yet, so the examples are COMPILED here
# for header/API verification but are NOT linked into executables (there are no
# symbols to link against). Building this OBJECT library on every build keeps the
# examples from bit-rotting while the API is reviewed.
#
# TODO(scalable-topics): once lib/st lands, replace this with one
# add_executable + target_link_libraries(... pulsarShared) per file, exactly like
# the samples above.
# The core samples are header-only previews of the pulsar::st API and build with
# no extra dependency.
set(SAMPLE_ST_SOURCES
st/SampleStProducer.cc
st/SampleStStreamConsumer.cc
st/SampleStQueueConsumer.cc
st/SampleStCheckpointConsumer.cc
)
# reflect-cpp powers jsonSchema<T>() (reflection-based JSON SerDe + schema). It is
# optional for this API-only PR: when the package is present the JSON sample is
# added and linked against it; when absent, only that one sample is skipped. (The
# reflectcpp vcpkg port does not yet ship an Avro backend, so it is not yet wired
# into the manifest; it will be added with the lib/st implementation.)
find_package(reflectcpp CONFIG QUIET)
if (reflectcpp_FOUND)
list(APPEND SAMPLE_ST_SOURCES st/SampleStJsonSchema.cc)
endif ()

add_library(StExamples OBJECT ${SAMPLE_ST_SOURCES})
# The scalable-topics (pulsar::st) API targets C++20; the rest of the client stays
# C++17. Set the standard per-target so only this code requires C++20.
set_target_properties(StExamples PROPERTIES CXX_STANDARD 20 CXX_STANDARD_REQUIRED ON)
# PRIVATE link gives the object sources pulsarShared's include directories; an
# OBJECT library is not itself linked, so the missing lib/st symbols are fine.
target_link_libraries(StExamples PRIVATE ${CLIENT_LIBS} pulsarShared)
if (reflectcpp_FOUND)
target_link_libraries(StExamples PRIVATE reflectcpp::reflectcpp)
endif ()
40 changes: 40 additions & 0 deletions examples/st/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Scalable Topics (`pulsar::st`) — API preview examples

These examples exercise the new typed scalable-topics C++ API under
[`include/pulsar/st/`](../../include/pulsar/st). They illustrate the proposed
surface and exist to gather community feedback.

> **Status: API definition only.** The implementation (`lib/st/`) does not exist
> yet, so these examples **compile but do not yet link**. They are wired into the
> CMake build as a compile-only `OBJECT` library (`StExamples` in
> [`examples/CMakeLists.txt`](../CMakeLists.txt)) — header-verified on every build,
> but not linked. Once `lib/st` lands they become normal `add_executable` targets.

The `pulsar::st` API requires **C++20** (the rest of the client stays C++17).
Syntax-check an example against the headers (no linking):

```sh
clang++ -std=c++20 -I ../../include -Wall -fsyntax-only SampleStProducer.cc
```

| File | Shows |
|---|---|
| `SampleStProducer.cc` | blocking + asynchronous publishing, transactions |
| `SampleStStreamConsumer.cc` | ordered (per-key) delivery, cumulative ack |
| `SampleStQueueConsumer.cc` | parallel delivery, individual ack + nack, dead-letter |
| `SampleStCheckpointConsumer.cc`| externally held position via `Checkpoint` |
| `SampleStJsonSchema.cc` | a struct as JSON with zero boilerplate (`jsonSchema<T>()`, reflect-cpp) |

## API at a glance

- **Typed builders** off one `PulsarClient`: `newProducer` / `newStreamConsumer` /
`newQueueConsumer` / `newCheckpointConsumer`, each taking a `Schema<T>`.
- **Synchronous calls return `Expected<T>`** (a stand-in for `std::expected`,
which is C++23): check it, or call `.value()` to throw `ClientException`.
`Expected<T>` is `[[nodiscard]]`, so a failure cannot be silently dropped.
- **Asynchronous calls return `Future<T>`**: `addListener(...)` to react on
completion without blocking, `get()` to block, or `co_await` it.
- **Schemas**: primitives are built in; structured types use `jsonSchema<T>()` /
`avroSchema<T>()` (reflect-cpp derives the SerDe **and** the declared schema from
the struct — no boilerplate), `protobufNativeSchema<T>()`, or a custom
`Schema<T>(serde)`. reflect-cpp is a required dependency of `pulsar::st`.
71 changes: 71 additions & 0 deletions examples/st/SampleStCheckpointConsumer.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

// Scalable-topics CheckpointConsumer: the application owns the position. Read,
// snapshot a Checkpoint, persist it externally, and later resume from it.

#include <pulsar/st/Client.h>

#include <cstddef>
#include <iostream>
#include <string>
#include <vector>

using namespace pulsar::st;

int main() {
auto clientResult = PulsarClient::builder().serviceUrl("pulsar://localhost:6650").build();
if (!clientResult) {
std::cerr << "failed to build client: " << clientResult.error() << "\n";
return 1;
}
PulsarClient client = std::move(clientResult).value();

// Restore from a previously stored checkpoint if you have one; else start at
// the earliest message. (Checkpoint::fromByteArray(savedBytes) to resume.)
auto consumerResult = client.newCheckpointConsumer(Schema<std::string>{})
.topic("topic://public/default/orders")
.startPosition(Checkpoint::earliest())
.create(); // NOTE: create(), not subscribe()
if (!consumerResult) {
std::cerr << "failed to create consumer: " << consumerResult.error() << "\n";
return 1;
}
CheckpointConsumer<std::string> consumer = std::move(consumerResult).value();

for (int i = 0; i < 5; i++) {
auto msg = consumer.receive(std::chrono::seconds(5));
if (!msg) {
if (msg.error().result == ResultTimeout) break;
std::cerr << "receive failed: " << msg.error() << "\n";
break;
}
std::cout << "read: " << msg->value() << "\n";
}

// Atomic position snapshot across all segments. Store the bytes yourself
// (Flink/Spark state backend, a file, etc.) — there is no broker-side cursor.
Checkpoint checkpoint = consumer.checkpoint();
std::vector<std::byte> persisted = checkpoint.toByteArray(); // store these bytes yourself
std::cout << "checkpoint is " << persisted.size() << " bytes\n";

(void)consumer.close();
(void)client.close();
return 0;
}
85 changes: 85 additions & 0 deletions examples/st/SampleStJsonSchema.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

// Passing a struct as JSON: `jsonSchema<T>()` derives both the SerDe and the
// declared schema from the struct's fields (via reflect-cpp) — NO macros, NO base
// class, NO schema string, NO serializer. Nested structs and containers included.
// `avroSchema<T>()` is identical for Avro.

#include <pulsar/st/Client.h>
#include <pulsar/st/JsonSchema.h>

#include <iostream>
#include <string>
#include <vector>

// Plain value types — that is the entire schema "declaration".
struct Address {
std::string street;
std::string city;
};
struct Order {
std::string orderId;
int quantity;
double unitPrice;
Address shipTo; // nested struct — handled automatically
std::vector<std::string> tags; // container — handled automatically
};

using namespace pulsar::st;

int main() {
auto clientResult = PulsarClient::builder().serviceUrl("pulsar://localhost:6650").build();
if (!clientResult) {
std::cerr << clientResult.error() << "\n";
return 1;
}
PulsarClient client = std::move(clientResult).value();

auto producerResult =
client.newProducer(jsonSchema<Order>()).topic("topic://public/default/orders").create();
if (!producerResult) {
std::cerr << producerResult.error() << "\n";
return 1;
}
Producer<Order> producer = std::move(producerResult).value();

Order order{"ord-1", 3, 9.99, {"1 Main St", "Springfield"}, {"priority", "gift"}};
if (auto sent = producer.send(order); sent) {
std::cout << "sent " << *sent << "\n";
}

auto consumerResult = client.newStreamConsumer(jsonSchema<Order>())
.topic("topic://public/default/orders")
.subscriptionName("orders-sub")
.subscribe();
if (consumerResult) {
StreamConsumer<Order> consumer = std::move(consumerResult).value();
if (auto msg = consumer.receive(std::chrono::seconds(5))) {
Order received = msg->value(); // decoded straight back into the struct
std::cout << received.orderId << " -> " << received.shipTo.city << "\n";
consumer.acknowledgeCumulative(msg->id());
}
(void)consumer.close();
}

(void)producer.close();
(void)client.close();
return 0;
}
94 changes: 94 additions & 0 deletions examples/st/SampleStProducer.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

// Scalable-topics producer: blocking and asynchronous publishing.

#include <pulsar/st/Client.h>

#include <iostream>

using namespace pulsar::st;

int main() {
// One client per application; keep it for the whole lifetime.
auto clientResult = PulsarClient::builder().serviceUrl("pulsar://localhost:6650").build();
if (!clientResult) {
std::cerr << "failed to build client: " << clientResult.error() << "\n";
return 1;
}
PulsarClient client = std::move(clientResult).value();

auto producerResult = client.newProducer(Schema<std::string>{})
.topic("topic://public/default/orders")
.sendTimeout(std::chrono::seconds(30))
.create();
if (!producerResult) {
std::cerr << "failed to create producer: " << producerResult.error() << "\n";
return 1;
}
Producer<std::string> producer = std::move(producerResult).value();

// Blocking send: returns Expected<MessageId> (must be checked — [[nodiscard]]).
for (int i = 0; i < 10; i++) {
auto sent = producer.newMessage()
.key("order-" + std::to_string(i % 4)) // per-key ordering
.value("payload-" + std::to_string(i))
.property("attempt", "1")
.send();
if (sent) {
std::cout << "sent " << *sent << "\n";
} else {
std::cerr << "send failed: " << sent.error() << "\n";
}
}

// Asynchronous send: react on completion without blocking.
producer.newMessage()
.key("order-async")
.value("async-payload")
.sendAsync()
.addListener([](const Expected<MessageId>& result) {
if (result) {
std::cout << "async sent " << *result << "\n";
} else {
std::cerr << "async send failed: " << result.error() << "\n";
}
});

// Transaction: produced messages become visible atomically on commit.
if (auto txnResult = client.newTransaction()) {
Transaction txn = *txnResult;
auto a = producer.newMessage().value("tx-a").transaction(txn).send();
auto b = producer.newMessage().value("tx-b").transaction(txn).send();
if (a && b) {
if (auto committed = txn.commit(); !committed) {
std::cerr << "commit failed: " << committed.error() << "\n";
}
} else {
(void)txn.abort();
}
}

(void)producer.flush(); // await all sends issued before this call
if (auto closed = producer.close(); !closed) {
std::cerr << "close failed: " << closed.error() << "\n";
}
(void)client.close();
return 0;
}
Loading
Loading