diff --git a/blob/mocks/header_service.go b/blob/mocks/header_service.go new file mode 100644 index 000000000..f45715de2 --- /dev/null +++ b/blob/mocks/header_service.go @@ -0,0 +1,66 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/celestiaorg/celestia-node/blob (interfaces: HeaderService) + +// Package mocks is a generated GoMock package. +package mocks + +import ( + context "context" + reflect "reflect" + + header "github.com/celestiaorg/celestia-node/header" + gomock "github.com/golang/mock/gomock" +) + +// MockHeaderService is a mock of HeaderService interface. +type MockHeaderService struct { + ctrl *gomock.Controller + recorder *MockHeaderServiceMockRecorder +} + +// MockHeaderServiceMockRecorder is the mock recorder for MockHeaderService. +type MockHeaderServiceMockRecorder struct { + mock *MockHeaderService +} + +// NewMockHeaderService creates a new mock instance. +func NewMockHeaderService(ctrl *gomock.Controller) *MockHeaderService { + mock := &MockHeaderService{ctrl: ctrl} + mock.recorder = &MockHeaderServiceMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockHeaderService) EXPECT() *MockHeaderServiceMockRecorder { + return m.recorder +} + +// GetByHeight mocks base method. +func (m *MockHeaderService) GetByHeight(arg0 context.Context, arg1 uint64) (*header.ExtendedHeader, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetByHeight", arg0, arg1) + ret0, _ := ret[0].(*header.ExtendedHeader) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetByHeight indicates an expected call of GetByHeight. +func (mr *MockHeaderServiceMockRecorder) GetByHeight(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetByHeight", reflect.TypeOf((*MockHeaderService)(nil).GetByHeight), arg0, arg1) +} + +// WaitForHeight mocks base method. +func (m *MockHeaderService) WaitForHeight(arg0 context.Context, arg1 uint64) (*header.ExtendedHeader, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "WaitForHeight", arg0, arg1) + ret0, _ := ret[0].(*header.ExtendedHeader) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// WaitForHeight indicates an expected call of WaitForHeight. +func (mr *MockHeaderServiceMockRecorder) WaitForHeight(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WaitForHeight", reflect.TypeOf((*MockHeaderService)(nil).WaitForHeight), arg0, arg1) +} diff --git a/blob/service.go b/blob/service.go index 89c3ef23e..77c3a73fa 100644 --- a/blob/service.go +++ b/blob/service.go @@ -31,6 +31,9 @@ import ( "github.com/celestiaorg/celestia-node/state" ) +// subscriptionBufferSize defines the size of the buffer for blob subscription channels. +const subscriptionBufferSize = 16 + var ( ErrBlobNotFound = errors.New("blob: not found") ErrInvalidProof = errors.New("blob: invalid proof") @@ -50,6 +53,12 @@ type Submitter interface { SubmitPayForBlob(context.Context, []*libshare.Blob, *state.TxConfig) (*types.TxResponse, error) } +// HeaderService provides header access for the blob service. +type HeaderService interface { + GetByHeight(context.Context, uint64) (*header.ExtendedHeader, error) + WaitForHeight(context.Context, uint64) (*header.ExtendedHeader, error) +} + type Service struct { // ctx represents the Service's lifecycle context. ctx context.Context @@ -58,8 +67,8 @@ type Service struct { blobSubmitter Submitter // shareGetter retrieves the EDS to fetch all shares from the requested header. shareGetter shwap.Getter - // headerGetter fetches header by the provided height - headerGetter func(context.Context, uint64) (*header.ExtendedHeader, error) + // headerServ provides header fetching and waiting capabilities. + headerServ HeaderService // headerSub subscribes to new headers to supply to blob subscriptions. headerSub func(ctx context.Context) (<-chan *header.ExtendedHeader, error) // metrics tracks blob-related metrics @@ -69,15 +78,14 @@ type Service struct { func NewService( submitter Submitter, getter shwap.Getter, - headerGetter func(context.Context, uint64) (*header.ExtendedHeader, error), + headerServ HeaderService, headerSub func(ctx context.Context) (<-chan *header.ExtendedHeader, error), ) *Service { return &Service{ blobSubmitter: submitter, shareGetter: getter, - headerGetter: headerGetter, + headerServ: headerServ, headerSub: headerSub, - metrics: nil, // Will be initialized via WithMetrics() if needed } } @@ -111,64 +119,98 @@ func (s *Service) Subscribe(ctx context.Context, ns libshare.Namespace) (<-chan } log.Infow("subscribing for blobs", - "namespaces", ns.String(), + "namespace", ns.String(), ) headerCh, err := s.headerSub(ctx) if err != nil { return nil, err } - blobCh := make(chan *SubscriptionResponse, 16) + blobCh := make(chan *SubscriptionResponse, subscriptionBufferSize) go func() { defer close(blobCh) for { select { + case <-ctx.Done(): + log.Debugw("blobsub: stopping, context canceled", "namespace", ns.ID()) + return + case <-s.ctx.Done(): + log.Debugw("blobsub: stopping, service stopped", "namespace", ns.ID()) + return case header, ok := <-headerCh: - if ctx.Err() != nil { - log.Debugw("blobsub: canceling subscription due to user ctx closing", "namespace", ns.ID()) - return - } if !ok { - log.Errorw("header channel closed for subscription", "namespace", ns.ID()) + log.Errorw("blobsub: header channel closed unexpectedly", "namespace", ns.ID()) return } - // close subscription before buffer overflows if len(blobCh) == cap(blobCh) { - log.Debugw("blobsub: canceling subscription due to buffer overflow from slow reader", "namespace", ns.ID()) + log.Warnw("blobsub: closing subscription, buffer full from slow reader", + "namespace", ns.ID()) return } - - var blobs []*Blob - var err error - for { - blobs, err = s.getAll(ctx, header, []libshare.Namespace{ns}) - if ctx.Err() != nil { - // context canceled, continuing would lead to unexpected missed heights for the client - log.Debugw("blobsub: canceling subscription due to user ctx closing", "namespace", ns.ID()) - return - } - if err == nil { - // operation successful, break the loop - break - } - } - - select { - case <-ctx.Done(): - log.Debugw("blobsub: pending response canceled due to user ctx closing", "namespace", ns.ID()) + if !s.fetchAndSendBlobs(ctx, blobCh, header, ns) { return - case blobCh <- &SubscriptionResponse{ - Blobs: blobs, - Height: header.Height(), - Header: &header.RawHeader, - }: } + } + } + }() + return blobCh, nil +} + +// SubscribeFromStartHeight returns a channel that will receive SubscriptionResponse objects +// starting from the given startHeight. It sequentially waits for each height to become +// available and retrieves blobs at that height. +// +// Blob retrieval errors are retried until successful. +// The channel will be closed when the context is canceled or the service is stopped. +// Not reading from the returned channel will block additional messages. +func (s *Service) SubscribeFromStartHeight( + ctx context.Context, + ns libshare.Namespace, + startHeight uint64, +) (<-chan *SubscriptionResponse, error) { + if s.ctx == nil { + return nil, fmt.Errorf("service has not been started") + } + + // startHeight must be > 0 and <= current head. + if _, err := s.headerServ.GetByHeight(ctx, startHeight); err != nil { + return nil, fmt.Errorf("failed to fetch header at startHeight %d: %w", startHeight, err) + } + + log.Infow("subscribing for blobs with start height", + "namespace", ns.String(), + "startHeight", startHeight, + ) + + // blobCh is unbounded in the sense that it is never closed due to a full buffer. + // When the buffer is full, sends block until the consumer reads. + blobCh := make(chan *SubscriptionResponse, subscriptionBufferSize) + go func() { + defer close(blobCh) + + // cancel ctx when the service stops so WaitForHeight unblocks + ctx, cancel := context.WithCancel(ctx) + defer cancel() + go func() { + select { + case <-s.ctx.Done(): + cancel() case <-ctx.Done(): - log.Debugw("blobsub: canceling subscription due to user ctx closing", "namespace", ns.ID()) + } + }() + + for height := startHeight; ; height++ { + h, err := s.headerServ.WaitForHeight(ctx, height) + if err != nil { + if !errors.Is(err, context.DeadlineExceeded) && !errors.Is(err, context.Canceled) { + log.Errorw("blobsub: failed waiting for header", + "namespace", ns.ID(), "height", height, "err", err) + } return - case <-s.ctx.Done(): - log.Debugw("blobsub: canceling subscription due to service ctx closing", "namespace", ns.ID()) + } + + if !s.fetchAndSendBlobs(ctx, blobCh, h, ns) { return } } @@ -176,6 +218,49 @@ func (s *Service) Subscribe(ctx context.Context, ns libshare.Namespace) (<-chan return blobCh, nil } +// fetchAndSendBlobs fetches blobs for the given header and namespace, retrying on error, +// and sends the response on blobCh. Returns false if the caller should stop. +func (s *Service) fetchAndSendBlobs( + ctx context.Context, + blobCh chan *SubscriptionResponse, + header *header.ExtendedHeader, + ns libshare.Namespace, +) bool { + var blobs []*Blob + var err error + for { + blobs, err = s.getAll(ctx, header, []libshare.Namespace{ns}) + if ctx.Err() != nil || s.ctx.Err() != nil { + return false + } + if err == nil { + break + } + log.Debugw("blobsub: retrying blob retrieval", + "namespace", ns.ID(), "height", header.Height(), "err", err) + select { + case <-time.After(100 * time.Millisecond): + case <-ctx.Done(): + return false + case <-s.ctx.Done(): + return false + } + } + + select { + case <-ctx.Done(): + return false + case <-s.ctx.Done(): + return false + case blobCh <- &SubscriptionResponse{ + Blobs: blobs, + Height: header.Height(), + Header: &header.RawHeader, + }: + return true + } +} + // Submit sends PFB transaction and reports the height at which it was included. // Allows sending multiple Blobs atomically synchronously. // Uses default wallet registered on the Node. @@ -317,7 +402,7 @@ func (s *Service) GetAll(ctx context.Context, height uint64, namespaces []libsha "namespaces", namespaces, ) - header, err := s.headerGetter(ctx, height) + header, err := s.headerServ.GetByHeight(ctx, height) if err != nil { return nil, err } @@ -420,7 +505,7 @@ func (s *Service) retrieve( ) span := trace.SpanFromContext(ctx) - header, err := s.headerGetter(ctx, height) + header, err := s.headerServ.GetByHeight(ctx, height) if err != nil { return nil, nil, err } @@ -633,7 +718,7 @@ func (s *Service) GetCommitmentProof( "getting the extended header", "height", height, ) - header, err := s.headerGetter(ctx, height) + header, err := s.headerServ.GetByHeight(ctx, height) if err != nil { return nil, err } diff --git a/blob/service_test.go b/blob/service_test.go index 787c3d88a..e013e954c 100644 --- a/blob/service_test.go +++ b/blob/service_test.go @@ -10,6 +10,7 @@ import ( "math" "slices" "sort" + "sync/atomic" "testing" "time" @@ -25,6 +26,7 @@ import ( libshare "github.com/celestiaorg/go-square/v3/share" "github.com/celestiaorg/rsmt2d" + "github.com/celestiaorg/celestia-node/blob/mocks" "github.com/celestiaorg/celestia-node/header" "github.com/celestiaorg/celestia-node/header/headertest" "github.com/celestiaorg/celestia-node/libs/utils" @@ -133,7 +135,7 @@ func TestBlobService_Get(t *testing.T) { return val < 0 }) - h, err := service.headerGetter(ctx, 1) + h, err := service.headerServ.GetByHeight(ctx, 1) require.NoError(t, err) resultShares, err := BlobsToShares(blobs...) @@ -226,7 +228,7 @@ func TestBlobService_Get(t *testing.T) { expectedResult: func(res any, err error) { require.NoError(t, err) - header, err := service.headerGetter(ctx, 1) + header, err := service.headerServ.GetByHeight(ctx, 1) require.NoError(t, err) proof, ok := res.(*Proof) @@ -341,7 +343,7 @@ func TestBlobService_Get(t *testing.T) { proofs, ok := i.([]*Proof) require.True(t, ok) - h, err := service.headerGetter(ctx, 1) + h, err := service.headerServ.GetByHeight(ctx, 1) require.NoError(t, err) originalDataWidth := len(h.DAH.RowRoots) / 2 @@ -482,7 +484,7 @@ func TestService_GetSingleBlobWithoutPadding(t *testing.T) { resultShares, err := BlobsToShares(newBlob) require.NoError(t, err) - h, err := service.headerGetter(ctx, 1) + h, err := service.headerServ.GetByHeight(ctx, 1) require.NoError(t, err) row, col := calculateIndex(len(h.DAH.RowRoots), newBlob.index) idx := shwap.SampleCoords{Row: row, Col: col} @@ -509,7 +511,7 @@ func TestService_Get(t *testing.T) { require.NoError(t, err) service := createService(ctx, t, shares) - h, err := service.headerGetter(ctx, 1) + h, err := service.headerServ.GetByHeight(ctx, 1) require.NoError(t, err) resultShares, err := BlobsToShares(blobs...) @@ -577,7 +579,7 @@ func TestService_GetAllWithoutPadding(t *testing.T) { resultShares, err := BlobsToShares(newBlobs...) require.NoError(t, err) - h, err := service.headerGetter(ctx, 1) + h, err := service.headerServ.GetByHeight(ctx, 1) require.NoError(t, err) shareOffset := 0 for i, blob := range newBlobs { @@ -827,6 +829,171 @@ func TestService_Subscribe_MultipleNamespaces(t *testing.T) { assert.Empty(t, emptyBlobResponse.Blobs) } +func TestService_SubscribeFromStartHeight(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + t.Cleanup(cancel) + + libBlobs, err := libshare.GenerateV0Blobs([]int{16, 16, 16, 16, 16}, true) + require.NoError(t, err) + blobs, err := convertBlobs(libBlobs...) + require.NoError(t, err) + + service, headers := createServiceWithSub(ctx, t, blobs) + err = service.Start(ctx) + require.NoError(t, err) + + t.Run("catchup and live", func(t *testing.T) { + ns := blobs[0].Namespace() + + subCh, err := service.SubscribeFromStartHeight(ctx, ns, 1) + require.NoError(t, err) + + for i := uint64(0); i < uint64(len(blobs)); i++ { + select { + case resp := <-subCh: + assert.Equal(t, &headers[i].RawHeader, resp.Header) + assert.Equal(t, headers[i].Height(), resp.Height) + assert.Equal(t, blobs[i].Data(), resp.Blobs[0].Data()) + case <-time.After(time.Second * 3): + t.Fatalf("timeout waiting for subscription response %d", i+1) + } + } + }) + + t.Run("start from middle height", func(t *testing.T) { + ns := blobs[0].Namespace() + startHeight := uint64(3) + + subCh, err := service.SubscribeFromStartHeight(ctx, ns, startHeight) + require.NoError(t, err) + + // receive responses starting from height 3. + for i := startHeight; i <= uint64(len(blobs)); i++ { + select { + case resp := <-subCh: + assert.Equal(t, &headers[i-1].RawHeader, resp.Header) + assert.Equal(t, i, resp.Height) + case <-time.After(time.Second * 3): + t.Fatalf("timeout waiting for subscription response at height %d", i) + } + } + }) + + t.Run("zero start height returns error", func(t *testing.T) { + ns := blobs[0].Namespace() + _, err := service.SubscribeFromStartHeight(ctx, ns, 0) + require.Error(t, err) + assert.Contains(t, err.Error(), "failed to fetch header at startHeight 0") + assert.Contains(t, err.Error(), "height is equal to 0") + }) + + t.Run("start height beyond network head returns error", func(t *testing.T) { + ns := blobs[0].Namespace() + _, err := service.SubscribeFromStartHeight(ctx, ns, 999) + require.Error(t, err) + assert.Contains(t, err.Error(), "failed to fetch header at startHeight 999") + assert.Contains(t, err.Error(), "from the future") + }) + + t.Run("user cancellation closes subscription", func(t *testing.T) { + subCtx, subCancel := context.WithCancel(ctx) + ns := blobs[0].Namespace() + + subCh, err := service.SubscribeFromStartHeight(subCtx, ns, 1) + require.NoError(t, err) + + // receive the first response, then cancel. + select { + case <-subCh: + subCancel() + case <-ctx.Done(): + subCancel() + t.Fatal("timeout waiting for first subscription response") + } + + // channel should close after cancellation. + for { + select { + case _, ok := <-subCh: + if !ok { + return + } + case <-ctx.Done(): + t.Fatal("timeout waiting for subscription channel to close") + } + } + }) + + t.Run("service shutdown closes subscription", func(t *testing.T) { + ns := blobs[0].Namespace() + + // tip at 2: heights 1-2 available, WaitForHeight(3) will block + headerServ, _ := newMockHeaderServiceWithTip(t, headers, 2) + + svc := NewService(nil, service.shareGetter, headerServ, nil) + require.NoError(t, svc.Start(ctx)) + + subCh, err := svc.SubscribeFromStartHeight(ctx, ns, 1) + require.NoError(t, err) + + // consume heights 1-2, then WaitForHeight(3) blocks + for i := uint64(1); i <= 2; i++ { + select { + case resp := <-subCh: + assert.Equal(t, i, resp.Height) + case <-time.After(time.Second * 3): + t.Fatalf("timeout waiting for response at height %d", i) + } + } + + // stop the service while WaitForHeight(3) is blocking + require.NoError(t, svc.Stop(context.Background())) + + // channel should close after service shutdown + select { + case _, ok := <-subCh: + assert.False(t, ok, "expected subscription channel to be closed") + case <-time.After(time.Second * 3): + t.Fatal("timeout waiting for subscription channel to close after service stop") + } + }) + + t.Run("WaitForHeight blocks until available", func(t *testing.T) { + ns := blobs[0].Namespace() + + headerServ, tip := newMockHeaderServiceWithTip(t, headers, 2) + + svc := NewService(nil, service.shareGetter, headerServ, nil) + require.NoError(t, svc.Start(ctx)) + t.Cleanup(func() { _ = svc.Stop(ctx) }) + + subCh, err := svc.SubscribeFromStartHeight(ctx, ns, 1) + require.NoError(t, err) + + // Heights 1-2 should arrive immediately. + for i := uint64(1); i <= 2; i++ { + select { + case resp := <-subCh: + assert.Equal(t, i, resp.Height, "expected height %d", i) + case <-time.After(time.Second * 3): + t.Fatalf("timeout waiting for response at height %d", i) + } + } + + // Advance the tip so heights 3-5 become available. + tip.Store(uint64(len(headers))) + + for i := uint64(3); i <= 5; i++ { + select { + case resp := <-subCh: + assert.Equal(t, i, resp.Height, "expected height %d", i) + case <-time.After(time.Second * 3): + t.Fatalf("timeout waiting for response at height %d", i) + } + } + }) +} + // BenchmarkGetByCommitment-12 1869 571663 ns/op 1085371 B/op 6414 allocs/op func BenchmarkGetByCommitment(b *testing.B) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) @@ -856,6 +1023,45 @@ func BenchmarkGetByCommitment(b *testing.B) { } } +func newMockHeaderServiceWithTip( + t *testing.T, + headers []*header.ExtendedHeader, + initialTip uint64, +) (*mocks.MockHeaderService, *atomic.Uint64) { + var tip atomic.Uint64 + tip.Store(initialTip) + + ctrl := gomock.NewController(t) + headerServ := mocks.NewMockHeaderService(ctrl) + headerServ.EXPECT().GetByHeight(gomock.Any(), gomock.Any()).AnyTimes(). + DoAndReturn(func(_ context.Context, height uint64) (*header.ExtendedHeader, error) { + if height == 0 { + return nil, fmt.Errorf("height is equal to 0") + } + if height > uint64(len(headers)) || height > tip.Load() { + return nil, fmt.Errorf("header: given height is from the future: "+ + "networkHeight: %d, requestedHeight: %d", tip.Load(), height) + } + return headers[height-1], nil + }) + headerServ.EXPECT().WaitForHeight(gomock.Any(), gomock.Any()).AnyTimes(). + DoAndReturn(func(ctx context.Context, height uint64) (*header.ExtendedHeader, error) { + for height > tip.Load() { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-time.After(10 * time.Millisecond): + } + } + if height > uint64(len(headers)) { + return nil, fmt.Errorf("header: given height is from the future: "+ + "networkHeight: %d, requestedHeight: %d", len(headers), height) + } + return headers[height-1], nil + }) + return headerServ, &tip +} + func createServiceWithSub(ctx context.Context, t testing.TB, blobs []*Blob) (*Service, []*header.ExtendedHeader) { bs := ipld.NewMemBlockservice() batching := ds_sync.MutexWrap(ds.NewMapDatastore()) @@ -876,11 +1082,24 @@ func createServiceWithSub(ctx context.Context, t testing.TB, blobs []*Blob) (*Se err = headerStore.Append(ctx, headers...) require.NoError(t, err) - fn := func(ctx context.Context, height uint64) (*header.ExtendedHeader, error) { + ctrl := gomock.NewController(t) + headerFn := func(_ context.Context, height uint64) (*header.ExtendedHeader, error) { + if height == 0 { + return nil, fmt.Errorf("height is equal to 0") + } + if height > uint64(len(headers)) { + return nil, fmt.Errorf("header: given height is from the future: "+ + "networkHeight: %d, requestedHeight: %d", len(headers), height) + } return headers[height-1], nil - // return headerStore.GetByHeight(ctx, height) } - fn2 := func(ctx context.Context) (<-chan *header.ExtendedHeader, error) { + headerServ := mocks.NewMockHeaderService(ctrl) + headerServ.EXPECT().GetByHeight(gomock.Any(), gomock.Any()).AnyTimes(). + DoAndReturn(headerFn) + headerServ.EXPECT().WaitForHeight(gomock.Any(), gomock.Any()).AnyTimes(). + DoAndReturn(headerFn) + + headerSub := func(ctx context.Context) (<-chan *header.ExtendedHeader, error) { headerChan := make(chan *header.ExtendedHeader, len(headers)) defer func() { for _, h := range headers { @@ -890,9 +1109,8 @@ func createServiceWithSub(ctx context.Context, t testing.TB, blobs []*Blob) (*Se }() return headerChan, nil } - ctrl := gomock.NewController(t) - shareGetter := mock.NewMockGetter(ctrl) + shareGetter := mock.NewMockGetter(ctrl) shareGetter.EXPECT().GetNamespaceData(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes(). DoAndReturn(func(ctx context.Context, h *header.ExtendedHeader, ns libshare.Namespace) (shwap.NamespaceData, error) { idx := int(h.Height()) - 1 @@ -900,7 +1118,7 @@ func createServiceWithSub(ctx context.Context, t testing.TB, blobs []*Blob) (*Se nd, err := eds.NamespaceData(ctx, accessor, ns) return nd, err }) - return NewService(nil, shareGetter, fn, fn2), headers + return NewService(nil, shareGetter, headerServ, headerSub), headers } func createService(ctx context.Context, t testing.TB, shares []libshare.Share) *Service { @@ -938,13 +1156,19 @@ func createService(ctx context.Context, t testing.TB, shares []libshare.Share) * err = headerStore.Append(ctx, h) require.NoError(t, err) - fn := func(ctx context.Context, height uint64) (*header.ExtendedHeader, error) { - return headerStore.GetByHeight(ctx, height) - } - fn2 := func(ctx context.Context) (<-chan *header.ExtendedHeader, error) { + headerServ := mocks.NewMockHeaderService(ctrl) + headerServ.EXPECT().GetByHeight(gomock.Any(), gomock.Any()).AnyTimes(). + DoAndReturn(func(ctx context.Context, height uint64) (*header.ExtendedHeader, error) { + return headerStore.GetByHeight(ctx, height) + }) + headerServ.EXPECT().WaitForHeight(gomock.Any(), gomock.Any()).AnyTimes(). + DoAndReturn(func(ctx context.Context, height uint64) (*header.ExtendedHeader, error) { + return headerStore.GetByHeight(ctx, height) + }) + headerSub := func(ctx context.Context) (<-chan *header.ExtendedHeader, error) { return nil, fmt.Errorf("not implemented") } - return NewService(nil, shareGetter, fn, fn2) + return NewService(nil, shareGetter, headerServ, headerSub) } // TestProveCommitmentAllCombinations tests proving all the commitments in a block. diff --git a/nodebuilder/blob/blob.go b/nodebuilder/blob/blob.go index 8180d96f7..eba20ac6a 100644 --- a/nodebuilder/blob/blob.go +++ b/nodebuilder/blob/blob.go @@ -44,6 +44,14 @@ type Module interface { ) (*blob.CommitmentProof, error) // Subscribe to published blobs from the given namespace as they are included. Subscribe(_ context.Context, _ libshare.Namespace) (<-chan *blob.SubscriptionResponse, error) + // SubscribeFromStartHeight subscribes to published blobs from the given namespace, + // starting from the given height. It first replays historical blobs from startHeight + // to the current head, then streams new blobs as they arrive. + SubscribeFromStartHeight( + _ context.Context, + _ libshare.Namespace, + startHeight uint64, + ) (<-chan *blob.SubscriptionResponse, error) } type API struct { @@ -87,6 +95,11 @@ type API struct { context.Context, libshare.Namespace, ) (<-chan *blob.SubscriptionResponse, error) `perm:"read"` + SubscribeFromStartHeight func( + context.Context, + libshare.Namespace, + uint64, + ) (<-chan *blob.SubscriptionResponse, error) `perm:"read"` } } @@ -141,3 +154,11 @@ func (api *API) Subscribe( ) (<-chan *blob.SubscriptionResponse, error) { return api.Internal.Subscribe(ctx, namespace) } + +func (api *API) SubscribeFromStartHeight( + ctx context.Context, + namespace libshare.Namespace, + startHeight uint64, +) (<-chan *blob.SubscriptionResponse, error) { + return api.Internal.SubscribeFromStartHeight(ctx, namespace, startHeight) +} diff --git a/nodebuilder/blob/mocks/api.go b/nodebuilder/blob/mocks/api.go index 8f83d593d..fba95951e 100644 --- a/nodebuilder/blob/mocks/api.go +++ b/nodebuilder/blob/mocks/api.go @@ -141,3 +141,18 @@ func (mr *MockModuleMockRecorder) Subscribe(arg0, arg1 interface{}) *gomock.Call mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Subscribe", reflect.TypeOf((*MockModule)(nil).Subscribe), arg0, arg1) } + +// SubscribeFromStartHeight mocks base method. +func (m *MockModule) SubscribeFromStartHeight(arg0 context.Context, arg1 share.Namespace, arg2 uint64) (<-chan *blob.SubscriptionResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SubscribeFromStartHeight", arg0, arg1, arg2) + ret0, _ := ret[0].(<-chan *blob.SubscriptionResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// SubscribeFromStartHeight indicates an expected call of SubscribeFromStartHeight. +func (mr *MockModuleMockRecorder) SubscribeFromStartHeight(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SubscribeFromStartHeight", reflect.TypeOf((*MockModule)(nil).SubscribeFromStartHeight), arg0, arg1, arg2) +} diff --git a/nodebuilder/blob/module.go b/nodebuilder/blob/module.go index cf07ed373..567c3b4f7 100644 --- a/nodebuilder/blob/module.go +++ b/nodebuilder/blob/module.go @@ -15,8 +15,8 @@ import ( func ConstructModule() fx.Option { return fx.Module("blob", fx.Provide( - func(service headerService.Module) func(context.Context, uint64) (*header.ExtendedHeader, error) { - return service.GetByHeight + func(service headerService.Module) blob.HeaderService { + return service }, ), fx.Provide( @@ -28,10 +28,10 @@ func ConstructModule() fx.Option { func( state state.Module, sGetter shwap.Getter, - getByHeightFn func(context.Context, uint64) (*header.ExtendedHeader, error), + headerServ blob.HeaderService, subscribeFn func(context.Context) (<-chan *header.ExtendedHeader, error), ) *blob.Service { - return blob.NewService(state, sGetter, getByHeightFn, subscribeFn) + return blob.NewService(state, sGetter, headerServ, subscribeFn) }, fx.OnStart(func(ctx context.Context, serv *blob.Service) error { return serv.Start(ctx) diff --git a/nodebuilder/da/service.go b/nodebuilder/da/service.go index 6e5021900..567b8503a 100644 --- a/nodebuilder/da/service.go +++ b/nodebuilder/da/service.go @@ -15,7 +15,6 @@ import ( libshare "github.com/celestiaorg/go-square/v3/share" "github.com/celestiaorg/celestia-node/blob" - "github.com/celestiaorg/celestia-node/header" nodeblob "github.com/celestiaorg/celestia-node/nodebuilder/blob" "github.com/celestiaorg/celestia-node/state" ) @@ -30,8 +29,8 @@ var log = logging.Logger("go-da") const heightLen = 8 type Service struct { - blobServ nodeblob.Module - headerGetter func(context.Context, uint64) (*header.ExtendedHeader, error) + blobServ nodeblob.Module + headerServ blob.HeaderService } // SubmitOptions defines options for blob submission using SubmitWithOptions. @@ -51,11 +50,11 @@ type SubmitOptions struct { func NewService( blobMod nodeblob.Module, - headerGetter func(context.Context, uint64) (*header.ExtendedHeader, error), + headerServ blob.HeaderService, ) *Service { return &Service{ - blobServ: blobMod, - headerGetter: headerGetter, + blobServ: blobMod, + headerServ: headerServ, } } @@ -130,7 +129,7 @@ func (s *Service) GetIDs(ctx context.Context, height uint64, namespace da.Namesp for _, b := range blobs { ids = append(ids, MakeID(height, b.Commitment)) } - h, err := s.headerGetter(ctx, height) + h, err := s.headerServ.GetByHeight(ctx, height) if err != nil { return nil, err }