-
Notifications
You must be signed in to change notification settings - Fork 1.1k
feat(api) start blob sub from height instead of tip of the chain #4864
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 4 commits
51a54e9
4b1e926
42db254
74ed8f7
66175f1
af30b02
c8f9bcf
be269cd
f0e01fa
67e2734
b679f68
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -7,6 +7,7 @@ import ( | |
| "errors" | ||
| "fmt" | ||
| "slices" | ||
| "strings" | ||
| "sync" | ||
| "time" | ||
|
|
||
|
|
@@ -110,9 +111,8 @@ func (s *Service) Subscribe(ctx context.Context, ns libshare.Namespace) (<-chan | |
| return nil, fmt.Errorf("service has not been started") | ||
| } | ||
|
|
||
| log.Infow("subscribing for blobs", | ||
| "namespaces", ns.String(), | ||
| ) | ||
| log.Infow("subscribing for blobs", "namespace", ns.String()) | ||
|
|
||
| headerCh, err := s.headerSub(ctx) | ||
| if err != nil { | ||
| return nil, err | ||
|
|
@@ -121,61 +121,204 @@ func (s *Service) Subscribe(ctx context.Context, ns libshare.Namespace) (<-chan | |
| blobCh := make(chan *SubscriptionResponse, 16) | ||
| go func() { | ||
| defer close(blobCh) | ||
| s.streamBlobs(ctx, blobCh, headerCh, ns, 0) | ||
| }() | ||
| return blobCh, nil | ||
| } | ||
|
|
||
| for { | ||
| select { | ||
| case header, ok := <-headerCh: | ||
| if ctx.Err() != nil { | ||
| log.Debugw("blobsub: canceling subscription due to user ctx closing", "namespace", ns.ID()) | ||
| return | ||
| } | ||
| if !ok { | ||
| log.Errorw("header channel closed for subscription", "namespace", ns.ID()) | ||
| return | ||
| } | ||
| // close subscription before buffer overflows | ||
| if len(blobCh) == cap(blobCh) { | ||
| log.Debugw("blobsub: canceling subscription due to buffer overflow from slow reader", "namespace", ns.ID()) | ||
| return | ||
| } | ||
| // SubscribeWithStartHeight returns a channel that will receive SubscriptionResponse objects | ||
| // starting from the given startHeight. It first replays all historical blobs from startHeight | ||
| // up to the current network head (catchup), then seamlessly transitions to streaming | ||
| // new blobs as they arrive (live). | ||
| // | ||
| // Retrieval errors during catchup (e.g. pruned data) will close the subscription. | ||
| // During the live phase, retrieval errors are retried until successful. | ||
| // The channel will be closed when the context is canceled or the service is stopped. | ||
| // Not reading from the returned channel will cause the stream to close after 16 messages. | ||
| func (s *Service) SubscribeWithStartHeight( | ||
| ctx context.Context, | ||
| ns libshare.Namespace, | ||
| startHeight uint64, | ||
| ) (<-chan *SubscriptionResponse, error) { | ||
| if s.ctx == nil { | ||
| return nil, fmt.Errorf("service has not been started") | ||
| } | ||
|
|
||
| var blobs []*Blob | ||
| var err error | ||
| for { | ||
| blobs, err = s.getAll(ctx, header, []libshare.Namespace{ns}) | ||
| if ctx.Err() != nil { | ||
| // context canceled, continuing would lead to unexpected missed heights for the client | ||
| log.Debugw("blobsub: canceling subscription due to user ctx closing", "namespace", ns.ID()) | ||
| return | ||
| } | ||
| if err == nil { | ||
| // operation successful, break the loop | ||
| break | ||
| } | ||
| } | ||
| // verify that startHeight is available | ||
| if _, err := s.headerGetter(ctx, startHeight); err != nil { | ||
| if isHeightUnavailable(err) { | ||
| return nil, fmt.Errorf("startHeight %d is not yet available on the network: %w", startHeight, err) | ||
| } | ||
| return nil, fmt.Errorf("failed to fetch header at startHeight %d: %w", startHeight, err) | ||
| } | ||
|
|
||
| select { | ||
| case <-ctx.Done(): | ||
| log.Debugw("blobsub: pending response canceled due to user ctx closing", "namespace", ns.ID()) | ||
| return | ||
| case blobCh <- &SubscriptionResponse{ | ||
| Blobs: blobs, | ||
| Height: header.Height(), | ||
| Header: &header.RawHeader, | ||
| }: | ||
| log.Infow("subscribing for blobs with start height", | ||
| "namespace", ns.String(), | ||
| "startHeight", startHeight, | ||
| ) | ||
|
|
||
| blobCh := make(chan *SubscriptionResponse, 16) | ||
|
ninabarbakadze marked this conversation as resolved.
Outdated
|
||
| go func() { | ||
| defer close(blobCh) | ||
|
|
||
| // catchup: fetch blobs for historical heights. | ||
| // retrieval errors are not retried — historical data that isn't available | ||
| // was pruned won't heal on retry. | ||
| lastHeight := uint64(0) | ||
| for height := startHeight; ; height++ { | ||
| if ctx.Err() != nil || s.ctx.Err() != nil { | ||
| log.Debugw("blobsub: canceling catchup due to user or service ctx closing", "namespace", ns.ID()) | ||
| return | ||
| } | ||
|
|
||
| header, err := s.headerGetter(ctx, height) | ||
| if err != nil { | ||
| if isHeightUnavailable(err) { | ||
| // height not yet available: catchup is done, transition to live. | ||
| log.Debugw("blobsub: catchup done, transitioning to live", | ||
| "namespace", ns.ID(), "lastHeight", height-1) | ||
| break | ||
| } | ||
| log.Errorw("blobsub: error during catchup header fetch", | ||
| "namespace", ns.ID(), "height", height, "err", err) | ||
| return | ||
| } | ||
|
|
||
| blobs, err := s.getAll(ctx, header, []libshare.Namespace{ns}) | ||
| if err != nil { | ||
| log.Errorw("blobsub: failed to retrieve blobs during catchup", | ||
| "namespace", ns.ID(), "height", height, "err", err) | ||
| return | ||
| } | ||
|
|
||
| select { | ||
| case <-ctx.Done(): | ||
| log.Debugw("blobsub: canceling subscription due to user ctx closing", "namespace", ns.ID()) | ||
| return | ||
| case <-s.ctx.Done(): | ||
| log.Debugw("blobsub: canceling subscription due to service ctx closing", "namespace", ns.ID()) | ||
| return | ||
| case blobCh <- &SubscriptionResponse{Blobs: blobs, Height: header.Height(), Header: &header.RawHeader}: | ||
|
ninabarbakadze marked this conversation as resolved.
Outdated
|
||
| } | ||
| lastHeight = height | ||
| } | ||
|
|
||
| // start the header subscription after catchup completes rather than before. | ||
| // starting before catchup wastes the gossipsub buffer (~32 headers) which | ||
| // fills and drops headers during long catchups. | ||
| headerCh, err := s.headerSub(ctx) | ||
| if err != nil { | ||
| log.Errorw("blobsub: failed to start header subscription after catchup", | ||
| "namespace", ns.ID(), "err", err) | ||
| return | ||
| } | ||
|
|
||
| s.streamBlobs(ctx, blobCh, headerCh, ns, lastHeight+1) | ||
| }() | ||
| return blobCh, nil | ||
| } | ||
|
|
||
| // isHeightUnavailable returns true if the error from headerGetter indicates the | ||
| // requested height simply doesn't exist yet (as opposed to a transient or internal error). | ||
| // This matches the error strings returned by nodebuilder/header.Service.GetByHeight. | ||
| func isHeightUnavailable(err error) bool { | ||
| msg := err.Error() | ||
| return strings.Contains(msg, "from the future") || strings.Contains(msg, "syncing in progress") | ||
| } | ||
|
|
||
| // streamBlobs reads headers from headerCh and streams blobs for the given namespace | ||
| // on blobCh. expectedNextHeight is the next height the caller expects; if a live | ||
| // header is ahead of it, a small catchup fills the gap via headerGetter first. | ||
| // Retrieval errors during live streaming are retried indefinitely. | ||
| func (s *Service) streamBlobs( | ||
| ctx context.Context, | ||
| blobCh chan *SubscriptionResponse, | ||
| headerCh <-chan *header.ExtendedHeader, | ||
| ns libshare.Namespace, | ||
| expectedNextHeight uint64, | ||
| ) { | ||
| for { | ||
| select { | ||
| case <-ctx.Done(): | ||
| return | ||
| case <-s.ctx.Done(): | ||
| return | ||
| case header, ok := <-headerCh: | ||
| if ctx.Err() != nil || s.ctx.Err() != nil { | ||
| return | ||
| } | ||
| if !ok { | ||
| log.Errorw("blobsub: header channel closed unexpectedly", "namespace", ns.ID()) | ||
| return | ||
| } | ||
| if header.Height() < expectedNextHeight { | ||
| continue | ||
| } | ||
|
|
||
| // if the live header is ahead of expectedNextHeight, fill the gap. | ||
| // expectedNextHeight == 0 means no catchup was done (plain Subscribe), | ||
| // so no gap-filling is needed. | ||
| for ; expectedNextHeight > 0 && expectedNextHeight < header.Height(); expectedNextHeight++ { | ||
| if ctx.Err() != nil || s.ctx.Err() != nil { | ||
| return | ||
| } | ||
| gapHeader, err := s.headerGetter(ctx, expectedNextHeight) | ||
| if err != nil { | ||
| log.Errorw("blobsub: failed to fill gap between catchup and live", | ||
| "namespace", ns.ID(), "height", expectedNextHeight, "err", err) | ||
| return | ||
| } | ||
| if !s.fetchAndSendBlobs(ctx, blobCh, gapHeader, ns) { | ||
| return | ||
| } | ||
| } | ||
|
|
||
| if !s.fetchAndSendBlobs(ctx, blobCh, header, ns) { | ||
| return | ||
| } | ||
| expectedNextHeight = header.Height() + 1 | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // fetchAndSendBlobs fetches blobs for the given header and namespace, retrying on error, | ||
| // and sends the response on blobCh. Returns false if the caller should stop. | ||
| func (s *Service) fetchAndSendBlobs( | ||
| ctx context.Context, | ||
| blobCh chan *SubscriptionResponse, | ||
| header *header.ExtendedHeader, | ||
| ns libshare.Namespace, | ||
| ) bool { | ||
| if len(blobCh) == cap(blobCh) { | ||
| log.Warnw("blobsub: closing subscription, buffer full from slow reader", | ||
| "namespace", ns.ID()) | ||
| return false | ||
| } | ||
|
|
||
| var blobs []*Blob | ||
| var err error | ||
| for { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Tight infinite retry loop with no backoff or logging. If
This is pre-existing behavior from the original
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. backoff + debug 67e2734 |
||
| blobs, err = s.getAll(ctx, header, []libshare.Namespace{ns}) | ||
| if ctx.Err() != nil || s.ctx.Err() != nil { | ||
| return false | ||
| } | ||
| if err == nil { | ||
| break | ||
| } | ||
| } | ||
|
|
||
| select { | ||
| case <-ctx.Done(): | ||
| return false | ||
| case <-s.ctx.Done(): | ||
| return false | ||
| case blobCh <- &SubscriptionResponse{ | ||
| Blobs: blobs, | ||
| Height: header.Height(), | ||
| Header: &header.RawHeader, | ||
| }: | ||
| return true | ||
| } | ||
| } | ||
|
|
||
| // Submit sends PFB transaction and reports the height at which it was included. | ||
| // Allows sending multiple Blobs atomically synchronously. | ||
| // Uses default wallet registered on the Node. | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.