Skip to content

Commit 05f41a8

Browse files
committed
add some fixes
1 parent baa5ade commit 05f41a8

File tree

9 files changed

+258
-52
lines changed

9 files changed

+258
-52
lines changed

blob/blob.go

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -34,25 +34,22 @@ func (p Proof) verify(blob *Blob, header *header.ExtendedHeader) error {
3434
if err != nil {
3535
return err
3636
}
37-
toCoords, err := shwap.SampleCoordsFrom1DIndex(blob.Index()+len(shrs)-1, len(header.DAH.RowRoots))
38-
if err != nil {
39-
return err
40-
}
4137

4238
hasher := share.NewSHA256Hasher()
43-
for from, i := fromCoords.Row, 0; from <= toCoords.Row; from++ {
39+
shareOffset := 0
40+
for proofIdx := 0; proofIdx < len(p); proofIdx++ {
4441
hasher.Reset()
45-
sharesPerRow := p[i].End() - p[i].Start()
46-
valid := p[i].VerifyInclusion(
42+
sharesInProof := p[proofIdx].End() - p[proofIdx].Start()
43+
valid := p[proofIdx].VerifyInclusion(
4744
hasher,
4845
blob.Namespace().Bytes(),
49-
libshare.ToBytes(shrs[i:sharesPerRow]),
50-
header.DAH.RowRoots[from],
46+
libshare.ToBytes(shrs[shareOffset:shareOffset+sharesInProof]),
47+
header.DAH.RowRoots[fromCoords.Row+proofIdx],
5148
)
5249
if !valid {
53-
return errors.New("invalid share proof")
50+
return ErrInvalidProof
5451
}
55-
i += sharesPerRow
52+
shareOffset += sharesInProof
5653
}
5754
return nil
5855
}

blob/helper.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ func fromShwapBlob(shBlob *shwap.Blob, odsSize int) (*Blob, error) {
7575
return nil, err
7676
}
7777

78-
edsIndex, err := shwap.SampleCoordsAs1DIndex(coords, odsIndex*2)
78+
edsIndex, err := shwap.SampleCoordsAs1DIndex(coords, odsSize*2)
7979
if err != nil {
8080
return nil, err
8181
}

blob/metrics.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package blob
33
import (
44
"context"
55
"errors"
6+
"github.com/celestiaorg/celestia-node/share/shwap"
67
"time"
78

89
"go.opentelemetry.io/otel"
@@ -91,7 +92,7 @@ func (m *metrics) observeRetrieval(ctx context.Context, duration time.Duration,
9192
if err != nil {
9293
errorType := errorTypeUnknown
9394
switch {
94-
case errors.Is(err, ErrBlobNotFound):
95+
case errors.Is(err, shwap.ErrBlobNotFound):
9596
errorType = errorTypeNotFound
9697
case errors.Is(err, context.DeadlineExceeded):
9798
errorType = errorTypeTimeout

blob/service.go

Lines changed: 32 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ import (
3030
)
3131

3232
var (
33-
ErrBlobNotFound = errors.New("blob: not found")
3433
ErrInvalidProof = errors.New("blob: invalid proof")
3534

3635
log = logging.Logger("blob")
@@ -296,7 +295,7 @@ func (s *Service) GetProof(
296295
// the user will receive all found blobs along with a combined error message.
297296
//
298297
// All blobs will preserve the order of the namespaces that were requested.
299-
func (s *Service) GetAll(ctx context.Context, height uint64, namespaces []libshare.Namespace) (_ []*Blob, err error) {
298+
func (s *Service) GetAll(ctx context.Context, height uint64, namespaces []libshare.Namespace) (blobs []*Blob, err error) {
300299
ctx, span := tracer.Start(ctx, "blob/get-all")
301300
defer func() {
302301
utils.SetStatusAndEnd(span, err)
@@ -318,49 +317,62 @@ func (s *Service) GetAll(ctx context.Context, height uint64, namespaces []libsha
318317

319318
header, err := s.headerGetter(ctx, height)
320319
if err != nil {
321-
return nil, err
320+
return nil, fmt.Errorf("err getting header: %w", err)
321+
}
322+
323+
blobs, err = s.getAll(ctx, header, namespaces)
324+
if err != nil {
325+
return blobs, fmt.Errorf("err getting blobs: %w", err)
322326
}
323327

324-
return s.getAll(ctx, header, namespaces)
328+
namespaceStrings := make([]string, len(namespaces))
329+
for i := range namespaces {
330+
namespaceStrings[i] = namespaces[i].String()
331+
}
332+
span.SetAttributes(attribute.StringSlice("namespaces", namespaceStrings))
333+
span.SetAttributes(attribute.Int("total", len(blobs)))
334+
return blobs, nil
325335
}
326336

327337
func (s *Service) getAll(
328338
ctx context.Context,
329339
header *header.ExtendedHeader,
330340
namespaces []libshare.Namespace,
331-
) ([]*Blob, error) {
341+
) (_ []*Blob, err error) {
332342
var (
333-
span = trace.SpanFromContext(ctx)
334-
namespaceStrings = make([]string, len(namespaces))
335-
resultBlobs = make([][]*shwap.Blob, len(namespaces))
336-
resultErr = make([]error, len(namespaces))
337-
wg = sync.WaitGroup{}
343+
resultBlobs = make([][]*shwap.Blob, len(namespaces))
344+
resultErr = make([]error, len(namespaces))
345+
wg = sync.WaitGroup{}
338346
)
339347

340348
for i, namespace := range namespaces {
341349
wg.Add(1)
342350
go func(i int, namespace libshare.Namespace) {
343351
defer wg.Done()
352+
log.Debugw("retrieving all blobs from", "namespace", namespace.String(), "height", header.Height())
344353
resultBlobs[i], resultErr[i] = s.shareGetter.GetBlobs(ctx, header, namespace)
354+
if errors.Is(resultErr[i], shwap.ErrNotFound) {
355+
resultErr[i] = nil
356+
}
345357
}(i, namespace)
346-
347-
namespaceStrings[i] = namespace.String()
348358
}
349-
span.SetAttributes(attribute.StringSlice("namespaces", namespaceStrings))
350359
wg.Wait()
351360

352361
blbs := slices.Concat(resultBlobs...)
353-
errs := errors.Join(resultErr...)
362+
err = errors.Join(resultErr...)
363+
if len(blbs) == 0 {
364+
return nil, err
365+
}
354366

355367
blobs := make([]*Blob, len(blbs))
356368
for i, blb := range blbs {
357-
blob, err := fromShwapBlob(blb, len(header.DAH.RowRoots)/2)
369+
blob, convertErr := fromShwapBlob(blb, len(header.DAH.RowRoots)/2)
358370
if err != nil {
359-
errs = errors.Join(errs, err)
371+
err = errors.Join(err, convertErr)
360372
}
361373
blobs[i] = blob
362374
}
363-
return blobs, errs
375+
return blobs, err
364376
}
365377

366378
// Included verifies that the blob was included in a specific height.
@@ -406,6 +418,9 @@ func (s *Service) Included(
406418

407419
shwapBlob, err := s.shareGetter.GetBlob(ctx, header, namespace, commitment)
408420
if err != nil {
421+
if errors.Is(err, shwap.ErrBlobNotFound) {
422+
err = nil
423+
}
409424
return false, err
410425
}
411426

blob/service_test.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -409,15 +409,15 @@ func TestBlobService_Get(t *testing.T) {
409409
innerGetter := service.shareGetter
410410
getterWrapper := mock.NewMockGetter(ctrl)
411411
getterWrapper.EXPECT().
412-
GetNamespaceData(gomock.Any(), gomock.Any(), gomock.Any()).
412+
GetBlobs(gomock.Any(), gomock.Any(), gomock.Any()).
413413
DoAndReturn(
414414
func(
415415
ctx context.Context, h *header.ExtendedHeader, ns libshare.Namespace,
416-
) (shwap.NamespaceData, error) {
416+
) ([]*shwap.Blob, error) {
417417
if ns.Equals(blobsWithDiffNamespaces[0].Namespace()) {
418418
return nil, errors.New("internal error")
419419
}
420-
return innerGetter.GetNamespaceData(ctx, h, ns)
420+
return innerGetter.GetBlobs(ctx, h, ns)
421421
}).AnyTimes()
422422

423423
service.shareGetter = getterWrapper
@@ -929,7 +929,6 @@ func createService(ctx context.Context, t testing.TB, shares []libshare.Share) *
929929
func(ctx context.Context,
930930
h *header.ExtendedHeader,
931931
ns libshare.Namespace,
932-
com []byte,
933932
) ([]*shwap.Blob, error) {
934933
return accessor.Blobs(ctx, ns)
935934
})

share/shwap/blob.go

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ const blobName = "blob_v0"
1919

2020
var subtreeRootThreshold = appconsts.SubtreeRootThreshold
2121

22+
var ErrBlobNotFound = errors.New("blob: not found")
23+
2224
// Blob represents a retrieved blob from the data square containing
2325
// the blob data along with proofs for the verification and blob's index(position inside the ODS).
2426
type Blob struct {
@@ -50,7 +52,7 @@ func BlobFromShares(
5052
colStart := 0
5153
for rowStart := 0; rowStart < len(libShares); {
5254
shr := libShares[rowStart][colStart]
53-
if !shr.Namespace().Equals(namespace) || !shr.IsSequenceStart() {
55+
if !shr.Namespace().Equals(namespace) || !shr.IsSequenceStart() || shr.IsPadding() {
5456
colStart++
5557
if colStart >= odsSize {
5658
colStart = 0
@@ -75,11 +77,15 @@ func BlobFromShares(
7577
rngShares := libShares[rowStart : to.Row+1]
7678
shrs := make([]libshare.Share, 0, sharesAmount)
7779
for i := range rngShares {
78-
col := odsSize
80+
startCol := 0
81+
endCol := odsSize
82+
if i == 0 {
83+
startCol = from.Col // account for starting column on first row
84+
}
7985
if i == len(rngShares)-1 {
80-
col = to.Col + 1
86+
endCol = to.Col + 1
8187
}
82-
shrs = append(shrs, rngShares[i][:col]...)
88+
shrs = append(shrs, rngShares[i][startCol:endCol]...)
8389
}
8490

8591
blobs, err := libshare.ParseBlobs(shrs)
@@ -113,7 +119,7 @@ func BlobFromShares(
113119
StartIndex: fromIndex,
114120
}, err
115121
}
116-
return nil, errors.New("blob not found")
122+
return nil, ErrBlobNotFound
117123
}
118124

119125
func (b *Blob) VerifyInclusion(roots [][]byte) error {
@@ -228,7 +234,7 @@ func BlobsFromShares(
228234
colStart := 0
229235
for rowStart := 0; rowStart < len(libShares); {
230236
shr := libShares[rowStart][colStart]
231-
if !shr.Namespace().Equals(namespace) || !shr.IsSequenceStart() {
237+
if !shr.Namespace().Equals(namespace) || !shr.IsSequenceStart() || shr.IsPadding() {
232238
colStart++
233239
if colStart >= odsSize {
234240
colStart = 0
@@ -265,7 +271,7 @@ func BlobsFromShares(
265271
}
266272

267273
if len(blobs) == 0 {
268-
return nil, errors.New("blobs were empty")
274+
return nil, ErrNotFound
269275
}
270276
return blobs, nil
271277
}

0 commit comments

Comments
 (0)