Skip to content
Open
231 changes: 187 additions & 44 deletions blob/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"errors"
"fmt"
"slices"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -110,9 +111,8 @@ func (s *Service) Subscribe(ctx context.Context, ns libshare.Namespace) (<-chan
return nil, fmt.Errorf("service has not been started")
}

log.Infow("subscribing for blobs",
"namespaces", ns.String(),
)
log.Infow("subscribing for blobs", "namespace", ns.String())

headerCh, err := s.headerSub(ctx)
if err != nil {
return nil, err
Expand All @@ -121,61 +121,204 @@ func (s *Service) Subscribe(ctx context.Context, ns libshare.Namespace) (<-chan
blobCh := make(chan *SubscriptionResponse, 16)
go func() {
defer close(blobCh)
s.streamBlobs(ctx, blobCh, headerCh, ns, 0)
}()
return blobCh, nil
}

for {
select {
case header, ok := <-headerCh:
if ctx.Err() != nil {
log.Debugw("blobsub: canceling subscription due to user ctx closing", "namespace", ns.ID())
return
}
if !ok {
log.Errorw("header channel closed for subscription", "namespace", ns.ID())
return
}
// close subscription before buffer overflows
if len(blobCh) == cap(blobCh) {
log.Debugw("blobsub: canceling subscription due to buffer overflow from slow reader", "namespace", ns.ID())
return
}
// SubscribeWithStartHeight returns a channel that will receive SubscriptionResponse objects
// starting from the given startHeight. It first replays all historical blobs from startHeight
// up to the current network head (catchup), then seamlessly transitions to streaming
// new blobs as they arrive (live).
//
// Retrieval errors during catchup (e.g. pruned data) will close the subscription.
// During the live phase, retrieval errors are retried until successful.
// The channel will be closed when the context is canceled or the service is stopped.
// Not reading from the returned channel will cause the stream to close after 16 messages.
func (s *Service) SubscribeWithStartHeight(
Comment thread
ninabarbakadze marked this conversation as resolved.
Outdated
ctx context.Context,
ns libshare.Namespace,
startHeight uint64,
) (<-chan *SubscriptionResponse, error) {
if s.ctx == nil {
return nil, fmt.Errorf("service has not been started")
}

var blobs []*Blob
var err error
for {
blobs, err = s.getAll(ctx, header, []libshare.Namespace{ns})
if ctx.Err() != nil {
// context canceled, continuing would lead to unexpected missed heights for the client
log.Debugw("blobsub: canceling subscription due to user ctx closing", "namespace", ns.ID())
return
}
if err == nil {
// operation successful, break the loop
break
}
}
// verify that startHeight is available
if _, err := s.headerGetter(ctx, startHeight); err != nil {
if isHeightUnavailable(err) {
return nil, fmt.Errorf("startHeight %d is not yet available on the network: %w", startHeight, err)
}
return nil, fmt.Errorf("failed to fetch header at startHeight %d: %w", startHeight, err)
}

select {
case <-ctx.Done():
log.Debugw("blobsub: pending response canceled due to user ctx closing", "namespace", ns.ID())
return
case blobCh <- &SubscriptionResponse{
Blobs: blobs,
Height: header.Height(),
Header: &header.RawHeader,
}:
log.Infow("subscribing for blobs with start height",
"namespace", ns.String(),
"startHeight", startHeight,
)

blobCh := make(chan *SubscriptionResponse, 16)
Comment thread
ninabarbakadze marked this conversation as resolved.
Outdated
go func() {
defer close(blobCh)

// catchup: fetch blobs for historical heights.
// retrieval errors are not retried — historical data that isn't available
// was pruned won't heal on retry.
lastHeight := uint64(0)
for height := startHeight; ; height++ {
if ctx.Err() != nil || s.ctx.Err() != nil {
log.Debugw("blobsub: canceling catchup due to user or service ctx closing", "namespace", ns.ID())
return
}

header, err := s.headerGetter(ctx, height)
if err != nil {
if isHeightUnavailable(err) {
// height not yet available: catchup is done, transition to live.
log.Debugw("blobsub: catchup done, transitioning to live",
"namespace", ns.ID(), "lastHeight", height-1)
break
}
log.Errorw("blobsub: error during catchup header fetch",
"namespace", ns.ID(), "height", height, "err", err)
return
}

blobs, err := s.getAll(ctx, header, []libshare.Namespace{ns})
if err != nil {
log.Errorw("blobsub: failed to retrieve blobs during catchup",
"namespace", ns.ID(), "height", height, "err", err)
return
}

select {
case <-ctx.Done():
log.Debugw("blobsub: canceling subscription due to user ctx closing", "namespace", ns.ID())
return
case <-s.ctx.Done():
log.Debugw("blobsub: canceling subscription due to service ctx closing", "namespace", ns.ID())
return
case blobCh <- &SubscriptionResponse{Blobs: blobs, Height: header.Height(), Header: &header.RawHeader}:
Comment thread
ninabarbakadze marked this conversation as resolved.
Outdated
}
lastHeight = height
}

// start the header subscription after catchup completes rather than before.
// starting before catchup wastes the gossipsub buffer (~32 headers) which
// fills and drops headers during long catchups.
headerCh, err := s.headerSub(ctx)
if err != nil {
log.Errorw("blobsub: failed to start header subscription after catchup",
"namespace", ns.ID(), "err", err)
return
}

s.streamBlobs(ctx, blobCh, headerCh, ns, lastHeight+1)
}()
return blobCh, nil
}

// isHeightUnavailable returns true if the error from headerGetter indicates the
// requested height simply doesn't exist yet (as opposed to a transient or internal error).
// This matches the error strings returned by nodebuilder/header.Service.GetByHeight.
func isHeightUnavailable(err error) bool {
msg := err.Error()
return strings.Contains(msg, "from the future") || strings.Contains(msg, "syncing in progress")
}

// streamBlobs reads headers from headerCh and streams blobs for the given namespace
// on blobCh. expectedNextHeight is the next height the caller expects; if a live
// header is ahead of it, a small catchup fills the gap via headerGetter first.
// Retrieval errors during live streaming are retried indefinitely.
func (s *Service) streamBlobs(
ctx context.Context,
blobCh chan *SubscriptionResponse,
headerCh <-chan *header.ExtendedHeader,
ns libshare.Namespace,
expectedNextHeight uint64,
) {
for {
select {
case <-ctx.Done():
return
case <-s.ctx.Done():
return
case header, ok := <-headerCh:
if ctx.Err() != nil || s.ctx.Err() != nil {
return
}
if !ok {
log.Errorw("blobsub: header channel closed unexpectedly", "namespace", ns.ID())
return
}
if header.Height() < expectedNextHeight {
continue
}

// if the live header is ahead of expectedNextHeight, fill the gap.
// expectedNextHeight == 0 means no catchup was done (plain Subscribe),
// so no gap-filling is needed.
for ; expectedNextHeight > 0 && expectedNextHeight < header.Height(); expectedNextHeight++ {
if ctx.Err() != nil || s.ctx.Err() != nil {
return
}
gapHeader, err := s.headerGetter(ctx, expectedNextHeight)
if err != nil {
log.Errorw("blobsub: failed to fill gap between catchup and live",
"namespace", ns.ID(), "height", expectedNextHeight, "err", err)
return
}
if !s.fetchAndSendBlobs(ctx, blobCh, gapHeader, ns) {
return
}
}

if !s.fetchAndSendBlobs(ctx, blobCh, header, ns) {
return
}
expectedNextHeight = header.Height() + 1
}
}
}

// fetchAndSendBlobs fetches blobs for the given header and namespace, retrying on error,
// and sends the response on blobCh. Returns false if the caller should stop.
func (s *Service) fetchAndSendBlobs(
ctx context.Context,
blobCh chan *SubscriptionResponse,
header *header.ExtendedHeader,
ns libshare.Namespace,
) bool {
if len(blobCh) == cap(blobCh) {
log.Warnw("blobsub: closing subscription, buffer full from slow reader",
"namespace", ns.ID())
return false
}

var blobs []*Blob
var err error
for {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tight infinite retry loop with no backoff or logging.

If getAll consistently fails (e.g. network partition), this spins at 100% CPU with no visibility. Consider:

  1. Logging the error (at least at Debug level)
  2. Adding a short backoff (time.After(100ms) or exponential)

This is pre-existing behavior from the original Subscribe, but worth fixing now that it's extracted into a shared helper.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

backoff + debug 67e2734

blobs, err = s.getAll(ctx, header, []libshare.Namespace{ns})
if ctx.Err() != nil || s.ctx.Err() != nil {
return false
}
if err == nil {
break
}
}

select {
case <-ctx.Done():
return false
case <-s.ctx.Done():
return false
case blobCh <- &SubscriptionResponse{
Blobs: blobs,
Height: header.Height(),
Header: &header.RawHeader,
}:
return true
}
}

// Submit sends PFB transaction and reports the height at which it was included.
// Allows sending multiple Blobs atomically synchronously.
// Uses default wallet registered on the Node.
Expand Down
Loading
Loading