Skip to content

Commit 91c6bd7

Browse files
authored
feat: insert records in bulk via cached nonce (#111)
resolves: truflation/website#3705 <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit ## Release Notes * **New Features** * Added BulkInserter utility for high-throughput, pipelined bulk record insertion with fire-and-forget broadcasting and automatic chunking. * Supports configurable parameters (batch size, max inflight, max attempts) with built-in retry and backoff handling for transient errors. * **Documentation** * Added comprehensive API reference and practical examples demonstrating BulkInserter usage patterns and configuration guidance. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
1 parent 41a1fbc commit 91c6bd7

10 files changed

Lines changed: 783 additions & 2 deletions

File tree

README.md

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,64 @@ for record in records:
117117
print(f"Date: {date.strftime('%Y-%m-%d')}, Value: {record['Value']}")
118118
```
119119

120+
### High-Throughput Insertion with `BulkInserter`
121+
122+
When inserting more than a few hundred records from a single signer, looping
123+
`client.batch_insert_records(...)` is dramatically slower than it needs to be:
124+
each call forces a wait-for-inclusion (~1–2s per block) before the next
125+
broadcast, so 1,000 records can take 25+ minutes.
126+
127+
`BulkInserter` instead caches the nonce locally and broadcasts each chunk
128+
fire-and-forget, draining inflight transactions in batches via `WaitTx`. It
129+
handles `invalid nonce` (resets cache, retries) and `mempool full` (backs off,
130+
keeps cache) automatically.
131+
132+
```python
133+
from trufnetwork_sdk_py import TNClient, BulkInserter, BulkInsertError
134+
135+
client = TNClient("https://gateway.testnet.truf.network", "YOUR_PRIVATE_KEY")
136+
inserter = BulkInserter(client)
137+
138+
batches = [
139+
{
140+
"stream_id": "st...",
141+
"inputs": [
142+
{"date": 1700000000, "value": 1.5},
143+
# ... thousands more
144+
],
145+
},
146+
]
147+
148+
try:
149+
tx_hashes = inserter.insert_all(batches)
150+
print(f"broadcast {len(tx_hashes)} transactions")
151+
except BulkInsertError as e:
152+
# e.tx_hashes — list of hashes broadcast before failure
153+
# e.failed_chunk_index — chunk that failed (or total chunks if drain_failure)
154+
# e.drain_failure — True if WaitTx failed after all broadcasts succeeded
155+
print(f"bulk insert failed: {e}; recovered {len(e.tx_hashes)} partial hashes")
156+
```
157+
158+
**Constraints:**
159+
160+
- One `BulkInserter` per signer key — concurrent inserters from the same
161+
signer collide on nonces (the mempool admits transactions in strict nonce
162+
order).
163+
- Different signers run safely in parallel (independent nonce sequences).
164+
- Records may mix stream IDs within a chunk — the inserter flattens batches
165+
and chunks by total record count.
166+
167+
**Tunables** (defaults shown):
168+
169+
```python
170+
BulkInserter(
171+
client,
172+
batch_size=10, # records per insert_records tx; protocol cap is 10
173+
max_inflight=200, # broadcasts queued before forced drain via WaitTx
174+
max_attempts=5, # initial + retries on transient errors
175+
)
176+
```
177+
120178
## Understanding Transaction Lifecycle
121179

122180
**IMPORTANT:** All transaction operations return success when transactions enter the mempool, NOT when they are executed on-chain. This async behavior can cause race conditions if not handled properly.

bindings/bindings.go

Lines changed: 88 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,78 @@ func InsertRecords(client *tnclient.Client, inputs []types.InsertRecordInput) (s
212212
return txHash.String(), nil
213213
}
214214

215-
// NewInsertRecordInput creates a new InsertRecordInput struct
215+
// NewBulkInserter constructs a BulkInserter wired to the given TNClient.
216+
// Wraps tnclient.Client.LoadBulkInserter for gopy export to Python.
217+
//
218+
// batchSize: records per insert_records tx (must be <= protocol cap of 10).
219+
// maxInflight: how many broadcasts may queue before draining via WaitTx.
220+
// maxAttempts: max attempts per chunk (initial + retries) on transient
221+
// errors (invalid nonce, mempool full).
222+
// Pass 0 for any of these to use defaults (10, 200, 5).
223+
func NewBulkInserter(client *tnclient.Client, batchSize int, maxInflight int, maxAttempts int) (*contractsapi.BulkInserter, error) {
224+
if client == nil {
225+
return nil, fmt.Errorf("client is required")
226+
}
227+
var opts []contractsapi.BulkInserterOption
228+
if batchSize > 0 {
229+
opts = append(opts, contractsapi.WithBatchSize(batchSize))
230+
}
231+
if maxInflight > 0 {
232+
opts = append(opts, contractsapi.WithMaxInflight(maxInflight))
233+
}
234+
if maxAttempts > 0 {
235+
opts = append(opts, contractsapi.WithMaxAttempts(maxAttempts))
236+
}
237+
return client.LoadBulkInserter(opts...)
238+
}
239+
240+
// BulkInsertResult is the gopy-friendly return shape for BulkInsertAll.
241+
//
242+
// It always carries TxHashes (the partial hashes broadcast so far) so that
243+
// Python callers can recover from a failure — gopy discards a (result, error)
244+
// tuple's result when the error is non-nil, hence the explicit struct.
245+
//
246+
// On success: ErrorMsg is "" and the other fields are zero.
247+
// On failure: ErrorMsg is the formatted error string, FailedChunkIndex is
248+
// either the index of the failing broadcast chunk (when DrainFailure is false)
249+
// or the total chunks broadcast (when DrainFailure is true).
250+
type BulkInsertResult struct {
251+
TxHashes []string
252+
ErrorMsg string
253+
DrainFailure bool
254+
FailedChunkIndex int
255+
}
256+
257+
// BulkInsertAll runs InsertAll against the given inserter and returns a
258+
// BulkInsertResult that always carries the partial tx hashes (for recovery)
259+
// alongside any error info.
260+
func BulkInsertAll(b *contractsapi.BulkInserter, inputs []types.InsertRecordInput) *BulkInsertResult {
261+
res := &BulkInsertResult{}
262+
if b == nil {
263+
res.ErrorMsg = "inserter is required"
264+
return res
265+
}
266+
hashes, err := b.InsertAll(context.Background(), inputs)
267+
res.TxHashes = make([]string, len(hashes))
268+
for i, h := range hashes {
269+
res.TxHashes[i] = h.String()
270+
}
271+
if err != nil {
272+
res.ErrorMsg = err.Error()
273+
var bie *contractsapi.BulkInsertError
274+
if errors.As(err, &bie) {
275+
res.DrainFailure = bie.DrainFailure
276+
res.FailedChunkIndex = bie.FailedChunkIndex
277+
}
278+
}
279+
return res
280+
}
281+
282+
// NewInsertRecordInput creates a new InsertRecordInput struct.
283+
//
284+
// Resolves the data provider via GetCurrentAccount on every call. Suitable
285+
// for one-off inserts; for bulk paths, prefer NewInsertRecordInputForProvider
286+
// to avoid the per-record account lookup.
216287
func NewInsertRecordInput(client *tnclient.Client, streamId string, date int, val float64) types.InsertRecordInput {
217288
dataProvider, err := GetCurrentAccount(client)
218289
if err != nil {
@@ -228,6 +299,22 @@ func NewInsertRecordInput(client *tnclient.Client, streamId string, date int, va
228299
}
229300
}
230301

302+
// NewInsertRecordInputForProvider creates an InsertRecordInput with an
303+
// explicit data provider, skipping the per-call GetCurrentAccount lookup.
304+
//
305+
// Use this when building many inputs for a single signer (the bulk insertion
306+
// path): resolve the data provider once via GetCurrentAccount, then call this
307+
// helper for each record. Avoids redundant account RPC roundtrips and makes
308+
// errors loud (the caller controls validation).
309+
func NewInsertRecordInputForProvider(dataProvider string, streamId string, date int, val float64) types.InsertRecordInput {
310+
return types.InsertRecordInput{
311+
StreamId: streamId,
312+
DataProvider: dataProvider,
313+
EventTime: date,
314+
Value: val,
315+
}
316+
}
317+
231318
// NewGetRecordInput creates a new GetRecordInput struct
232319
func NewGetRecordInput(
233320
client *tnclient.Client,

docs/api-reference.md

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,94 @@ batches = [
220220
tx_hash = client.batch_insert_records(batches)
221221
```
222222

223+
### `BulkInserter` — high-throughput pipelined insertion
224+
225+
When you need to push hundreds or thousands of records from a single signer,
226+
use `BulkInserter` instead of looping `batch_insert_records`. It caches the
227+
nonce locally and broadcasts each chunk fire-and-forget — admission (~50ms)
228+
becomes the rate limit instead of inclusion (~1–2s per block).
229+
230+
#### `BulkInserter(client, batch_size=10, max_inflight=200, max_attempts=5)`
231+
232+
Wraps the sdk-go `BulkInserter` (see
233+
[`sdk-go/core/contractsapi/bulk_inserter.go`](https://github.com/trufnetwork/sdk-go/blob/main/core/contractsapi/bulk_inserter.go)).
234+
Mirrors the cached-nonce pattern from `node/extensions/tn_attestation/extension.go`
235+
(PR `kwilteam/node#1356`) which solves the same problem on the node side.
236+
237+
#### Parameters
238+
239+
- `client: TNClient` — must use HTTP transport (the default).
240+
- `batch_size: int = 10` — records per `insert_records` transaction. Must be ≤ the protocol cap (currently 10).
241+
- `max_inflight: int = 200` — broadcasts queued before draining via `WaitTx`.
242+
- `max_attempts: int = 5` — initial attempt + retries per chunk on transient errors (`invalid nonce`, `mempool full`).
243+
244+
#### `inserter.insert_all(batches: List[RecordBatch]) -> List[str]`
245+
246+
Flattens `batches` into a single record list, chunks by `batch_size`,
247+
broadcasts each chunk pipelined, and drains every `max_inflight` plus once at
248+
the end. Returns the tx hashes in submission order.
249+
250+
Records may mix stream IDs within a chunk — the inserter chunks by total record
251+
count, not by stream.
252+
253+
#### Raises
254+
255+
`BulkInsertError` — chunk failed after exhausting retries. The exception carries:
256+
257+
- `tx_hashes: List[str]` — tx hashes broadcast successfully before the failure
258+
- `drain_failure: bool``True` when all chunks broadcast but the final `WaitTx` failed
259+
- `failed_chunk_index: int` — index of the failing chunk (broadcast failure) or total chunks broadcast (drain failure)
260+
261+
Use these to recover: when `drain_failure` is `False`, resume from
262+
`records[failed_chunk_index * batch_size:]` after fixing the underlying issue.
263+
When `drain_failure` is `True`, all hashes are in `tx_hashes` — investigate
264+
inclusion separately (the broadcast itself succeeded).
265+
266+
#### Example
267+
268+
```python
269+
from trufnetwork_sdk_py import TNClient, BulkInserter, BulkInsertError
270+
271+
client = TNClient("https://gateway.testnet.truf.network", "YOUR_PRIVATE_KEY")
272+
inserter = BulkInserter(client)
273+
274+
batches = [
275+
{
276+
"stream_id": "st...",
277+
"inputs": [
278+
{"date": 1700000000, "value": 1.5},
279+
# ... thousands more
280+
],
281+
},
282+
]
283+
284+
try:
285+
tx_hashes = inserter.insert_all(batches)
286+
print(f"broadcast {len(tx_hashes)} transactions")
287+
except BulkInsertError as e:
288+
print(f"bulk insert failed: {e}")
289+
print(f" partial hashes: {len(e.tx_hashes)}")
290+
print(f" failed at chunk: {e.failed_chunk_index}")
291+
print(f" drain failure: {e.drain_failure}")
292+
# If !e.drain_failure, resume from records[e.failed_chunk_index * 10:]
293+
```
294+
295+
#### Constraints
296+
297+
- **One BulkInserter per signer key.** The cache is per-instance; concurrent
298+
inserters from the same signer collide on nonces because the mempool admits
299+
transactions strictly in nonce order.
300+
- **Sequential per signer, not concurrent.** Out-of-order HTTP arrival from
301+
one signer triggers `invalid nonce` rejections; the helper is single-threaded
302+
by design.
303+
- **Different signers run in parallel.** Per-signer nonces are independent.
304+
305+
#### Working example
306+
307+
See [`examples/bulk_insert_example`](../examples/bulk_insert_example) for a
308+
full lifecycle demo: connect → drop existing → deploy → bulk-insert → fetch +
309+
verify → drop.
310+
223311
## Stream Querying
224312

225313
### `client.get_records(stream_id: str, **kwargs) -> List[Dict]`
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
# Bulk Insert Example (Python)
2+
3+
Demonstrates `BulkInserter` — pipelined high-throughput record insertion that
4+
keeps a single signer within the protocol's 10-row-per-tx cap while
5+
broadcasting hundreds of transactions per minute.
6+
7+
## What it does
8+
9+
1. Connects to a local TN node with the dev private key
10+
2. Generates a stream ID and best-effort drops any existing stream with that ID
11+
3. Deploys a fresh primitive stream
12+
4. Bulk-inserts 25 synthetic records via `BulkInserter` (3 chunks of 10/10/5)
13+
5. Reads the records back and confirms count + values
14+
6. Drops the test stream
15+
16+
## Why use BulkInserter
17+
18+
Calling `client.batch_insert_records(...)` in a loop forces the SDK to wait for
19+
each transaction to be **included in a block** (~1–2s per call) before
20+
broadcasting the next. For 1,000 records that's 25+ minutes; for the Truflation
21+
CPI ingestor's ~17,000-record runs, it's 4–5 hours.
22+
23+
`BulkInserter` instead:
24+
25+
- Caches the nonce locally (one initial fetch, then increments)
26+
- Broadcasts each chunk fire-and-forget (`wait=False` underneath) — admission
27+
takes ~50ms versus inclusion's 1–2s
28+
- Drains inflight hashes in batches via `wait_for_tx`
29+
- Retries automatically on `invalid nonce` (resets the cache and refetches)
30+
and `mempool full` (backs off, keeps the cache)
31+
32+
Result: 1,000 records land in roughly one minute on a typical node, instead of
33+
half an hour.
34+
35+
## Prerequisites
36+
37+
A local node with migrations applied + the dev key whitelisted. From the
38+
`node` repo:
39+
40+
```bash
41+
task single:start # spin up postgres + tn-db
42+
PATH="$(pwd)/.build:$PATH" task action:migrate:dev # apply migrations + grant network_writer
43+
```
44+
45+
The dev key is `0000000000000000000000000000000000000000000000000000000000000001`,
46+
which derives to address `0x7E5F4552091A69125d5DfCb7b8C2659029395Bdf`. Both
47+
`task action:migrate:dev` and the `single:start` defaults wire this address as
48+
the DB owner and grant it `system:network_writer`.
49+
50+
## Running
51+
52+
From the `sdk-py` repo:
53+
54+
```bash
55+
source .venv/bin/activate
56+
python examples/bulk_insert_example/main.py
57+
```
58+
59+
## Expected output
60+
61+
```text
62+
connected as 0x7e5f4552091a69125d5dfcb7b8c2659029395bdf
63+
stream id: stbulkinsertxxxxxxxxxxxxxxxxxxxxx
64+
(no existing stream to drop: ...)
65+
stream deployed (tx 0x...)
66+
broadcasting 25 records via BulkInserter (batch_size=10)...
67+
done: 3 chunks broadcast + drained in 1.05s (350ms/chunk avg)
68+
69+
First 3 records read back:
70+
EventTime=1704067200 Value=1.000000000000000000
71+
EventTime=1704153600 Value=2.000000000000000000
72+
EventTime=1704240000 Value=3.000000000000000000
73+
...
74+
Total verified: 25 records
75+
dropped existing stream (tx 0x...)
76+
```
77+
78+
## Customizing
79+
80+
- **Larger workloads**: change `NUM_RECORDS` at the top of `main.py`. Chunks =
81+
`ceil(NUM_RECORDS / 10)`.
82+
- **Throughput knobs**: pass kwargs to `BulkInserter`:
83+
84+
```python
85+
inserter = BulkInserter(
86+
client,
87+
batch_size=10, # records per insert_records tx; protocol cap is 10
88+
max_inflight=500, # broadcasts queued before forced drain
89+
max_attempts=5, # initial + retries on transient errors
90+
)
91+
```
92+
- **Testnet/mainnet**: change `TEST_PROVIDER_URL` and the private key. Note
93+
that the account must hold `system:network_writer` to deploy streams.
94+
95+
## Related
96+
97+
- Python wrapper source: [`src/trufnetwork_sdk_py/bulk_inserter.py`](../../src/trufnetwork_sdk_py/bulk_inserter.py)
98+
- Underlying Go implementation: [`sdk-go/core/contractsapi/bulk_inserter.go`](https://github.com/trufnetwork/sdk-go/blob/main/core/contractsapi/bulk_inserter.go)
99+
- Pattern reference: [`tn_attestation/extension.go`](https://github.com/trufnetwork/node/blob/main/extensions/tn_attestation/extension.go)
100+
in the node repo (PR #1356) — same cached-nonce design that solved the
101+
attestation cron's "invalid nonce" noise.

0 commit comments

Comments
 (0)