feat(api) start blob sub from height instead of tip of the chain#4864
feat(api) start blob sub from height instead of tip of the chain#4864ninabarbakadze wants to merge 8 commits intocelestiaorg:mainfrom
Conversation
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #4864 +/- ##
==========================================
- Coverage 44.83% 35.42% -9.42%
==========================================
Files 265 313 +48
Lines 14620 21791 +7171
==========================================
+ Hits 6555 7719 +1164
- Misses 7313 13058 +5745
- Partials 752 1014 +262 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
blob/service.go
Outdated
| // During the live phase, retrieval errors are retried until successful. | ||
| // The channel will be closed when the context is canceled or the service is stopped. | ||
| // Not reading from the returned channel will cause the stream to close after 16 messages. | ||
| func (s *Service) SubscribeWithStartHeight( |
There was a problem hiding this comment.
still in draft sir pls
blob/service.go
Outdated
| "startHeight", startHeight, | ||
| ) | ||
|
|
||
| blobCh := make(chan *SubscriptionResponse, 16) |
There was a problem hiding this comment.
we might want to increase this buffer size as the catchup blobs will arrive much faster than live subscription. I'll defer to the reviewer on this.
walldiss
left a comment
There was a problem hiding this comment.
Nice work overall — the catchup→live design is solid and the code is well-structured. A few issues to address, one of which is a bug in the gap-fill path.
| if err != nil { | ||
| log.Errorw("blobsub: failed to fill gap between catchup and live", | ||
| "namespace", ns.ID(), "height", expectedNextHeight, "err", err) | ||
| return |
There was a problem hiding this comment.
Bug: gap-fill treats all headerGetter errors as fatal, but "syncing in progress" is transient.
Scenario:
- Catchup processes heights 1..100, height 101 returns "syncing in progress" (store head is 100, syncer head is 200)
isBeyondTipreturns true, catchup ends withexpectedNextHeight = 101headerSubstarts, first live header arrives at height 200- Gap-fill calls
headerGetter(ctx, 101)— store still hasn't synced to 101 - Returns "syncing in progress" again → subscription permanently closed
This needs a retry with backoff when isBeyondTip(err) is true, similar to:
gapHeader, err := s.headerGetter(ctx, expectedNextHeight)
if err != nil {
if isBeyondTip(err) {
// store hasn't caught up yet, wait and retry
select {
case <-ctx.Done(): return
case <-time.After(time.Second): continue
}
}
// ...
}Alternatively, consider using a WaitForHeight-style call that blocks until the header is available.
| return | ||
| case <-s.ctx.Done(): | ||
| return | ||
| case blobCh <- &SubscriptionResponse{Blobs: blobs, Height: header.Height(), Header: &header.RawHeader}: |
There was a problem hiding this comment.
No overflow check during catchup sends — inconsistent with live phase.
During live streaming (line 263), the subscription is closed when the buffer is full. But here during catchup, the send is blocking — a slow reader will stall the goroutine indefinitely rather than closing.
This might be intentional (catchup data is already available, so blocking is reasonable), but the behavior difference should be documented in the godoc. Currently the doc says "Not reading from the returned channel during live subscription will cause the stream to close after 16 messages" — add a note that during catchup, sends are blocking.
There was a problem hiding this comment.
Yes, this was intentional because catchup will be much faster than live subscription so I'll document it.
| // During the live phase, retrieval errors are retried until successful. | ||
| // The channel will be closed when the context is canceled or the service is stopped. | ||
| // Not reading from the returned channel during live subscription will cause the stream | ||
| // to close after 16 messages. |
There was a problem hiding this comment.
Godoc should document recovery pattern.
The docs explain the overflow behavior but not how users should handle it. Since SubscriptionResponse includes Height, clients can track the last received height and re-subscribe from lastHeight + 1. Worth adding a note like:
// If the channel is closed due to buffer overflow, clients can re-subscribe
// from the last received height + 1 to resume without gaps.
| // height is beyond the current network or local head. | ||
| func isBeyondTip(err error) bool { | ||
| msg := err.Error() | ||
| return strings.Contains(msg, "from the future") || strings.Contains(msg, "syncing in progress") |
There was a problem hiding this comment.
Fragile string matching — will silently break if error messages change.
This is coupled to specific error strings in nodebuilder/header/service.go. If those messages are ever changed, this breaks silently and catchup will treat "beyond tip" as a fatal error.
Ideal fix: export sentinel errors or error types from the header package that can be matched with errors.Is/errors.As.
Minimal fix: add a comment noting the coupling, e.g.:
// NOTE: coupled to error messages in nodebuilder/header/service.go:107,132There was a problem hiding this comment.
yes! I was going to ask if we wanted to ad an error type directly in the header package(IIRC this is where headerGetter lives)
| ) bool { | ||
| var blobs []*Blob | ||
| var err error | ||
| for { |
There was a problem hiding this comment.
Tight infinite retry loop with no backoff or logging.
If getAll consistently fails (e.g. network partition), this spins at 100% CPU with no visibility. Consider:
- Logging the error (at least at Debug level)
- Adding a short backoff (
time.After(100ms)or exponential)
This is pre-existing behavior from the original Subscribe, but worth fixing now that it's extracted into a shared helper.
| "namespace", ns.ID(), "height", expectedNextHeight, "err", err) | ||
| return | ||
| } | ||
| if !s.fetchAndSendBlobs(ctx, blobCh, gapHeader, ns) { |
There was a problem hiding this comment.
Gap-fill sends via fetchAndSendBlobs bypass the buffer overflow check.
The overflow check at line 263 runs once per live header, but this loop can emit multiple gap-fill responses before reaching it. If the gap is large and the reader is slow, the buffer could fill beyond the check. Consider adding an overflow check inside this loop as well.
There was a problem hiding this comment.
Overflow check being that we disconnect if buffer is full? I wouldn't disconnect during gap-fill because similar to catch-up the buffer might fill up quickly because of fast writes and slower reads.
fixes #4716
fixes https://linear.app/celestia/issue/PROTOCO-931/api-start-blob-sub-from-height-instead-of-tip-of-the-chain