Skip to content

Commit ff2a02e

Browse files
authored
Merge pull request #41 from mreiferson/issue-38
check current file against maxBytesPerFile
2 parents cc41549 + 879d93e commit ff2a02e

File tree

3 files changed

+90
-13
lines changed

3 files changed

+90
-13
lines changed

.github/workflows/test.yml

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,9 @@ jobs:
1212
fail-fast: false
1313
matrix:
1414
gover:
15-
- "1.13"
16-
- "1.14"
17-
- "1.15"
18-
- "1.16"
19-
- "1.17"
15+
- "1.18"
16+
- "1.19"
17+
- "1.20"
2018
goarch:
2119
- "amd64"
2220
- "386"
@@ -40,4 +38,8 @@ jobs:
4038
- name: lint
4139
run: |
4240
go vet
43-
gofmt -w *.go && git diff --exit-code
41+
GOFMT_OUT=$(gofmt -l *.go)
42+
if [ -n "$GOFMT_OUT" ]; then
43+
echo $GOFMT_OUT
44+
exit 1
45+
fi

diskqueue.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -349,7 +349,7 @@ func (d *diskQueue) readOne() ([]byte, error) {
349349
// we only consider rotating if we're reading a "complete" file
350350
// and since we cannot know the size at which it was rotated, we
351351
// rely on maxBytesPerFileRead rather than maxBytesPerFile
352-
if d.readFileNum < d.writeFileNum && d.nextReadPos >= d.maxBytesPerFileRead {
352+
if d.readFileNum <= d.writeFileNum && d.nextReadPos >= d.maxBytesPerFileRead {
353353
if d.readFile != nil {
354354
d.readFile.Close()
355355
d.readFile = nil
@@ -394,6 +394,7 @@ func (d *diskQueue) writeOne(data []byte) error {
394394
d.writeFile = nil
395395
}
396396
}
397+
397398
if d.writeFile == nil {
398399
curFileName := d.fileName(d.writeFileNum)
399400
d.writeFile, err = os.OpenFile(curFileName, os.O_RDWR|os.O_CREATE, 0600)
@@ -675,7 +676,7 @@ func (d *diskQueue) ioLoop() {
675676
count = 0
676677
}
677678

678-
if (d.readFileNum < d.writeFileNum) || (d.readPos < d.writePos) {
679+
if d.readFileNum < d.writeFileNum || (d.readFileNum == d.writeFileNum && d.readPos < d.writePos) {
679680
if d.nextReadPos == d.readPos {
680681
dataRead, err = d.readOne()
681682
if err != nil {

diskqueue_test.go

Lines changed: 79 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,15 @@ import (
44
"bufio"
55
"bytes"
66
"fmt"
7+
"io/fs"
78
"io/ioutil"
89
"os"
910
"path"
1011
"path/filepath"
1112
"reflect"
1213
"runtime"
1314
"strconv"
15+
"strings"
1416
"sync"
1517
"sync/atomic"
1618
"testing"
@@ -308,17 +310,22 @@ func TestDiskQueueCorruption(t *testing.T) {
308310
dqFn := dq.(*diskQueue).fileName(1)
309311
os.Truncate(dqFn, 400) // 3 valid messages, 5 corrupted
310312

311-
for i := 0; i < 19; i++ { // 1 message leftover in 4th file
312-
Equal(t, msg, <-dq.ReadChan())
313-
}
314-
315313
// corrupt the 4th (current) file
316314
dqFn = dq.(*diskQueue).fileName(3)
317315
os.Truncate(dqFn, 100)
318316

317+
for i := 0; i < 18; i++ {
318+
Equal(t, msg, <-dq.ReadChan())
319+
}
320+
Equal(t, int64(7), dq.Depth())
321+
322+
Equal(t, msg, <-dq.ReadChan())
323+
Equal(t, int64(0), dq.Depth())
324+
319325
dq.Put(msg) // in 5th file
320326

321327
Equal(t, msg, <-dq.ReadChan())
328+
Equal(t, int64(0), dq.Depth())
322329

323330
// write a corrupt (len 0) message at the 5th (current) file
324331
dq.(*diskQueue).writeFile.Write([]byte{0, 0, 0, 0})
@@ -331,14 +338,15 @@ func TestDiskQueueCorruption(t *testing.T) {
331338

332339
dq.Put(msg)
333340
dq.Put(msg)
341+
334342
// corrupt the last file
335343
dqFn = dq.(*diskQueue).fileName(5)
336344
os.Truncate(dqFn, 100)
337345

338346
Equal(t, int64(2), dq.Depth())
339347

340348
// return one message and try reading again from corrupted file
341-
<-dq.ReadChan()
349+
Equal(t, msg, <-dq.ReadChan())
342350

343351
// give diskqueue time to handle read error
344352
time.Sleep(50 * time.Millisecond)
@@ -773,3 +781,69 @@ func benchmarkDiskQueueGet(size int64, b *testing.B) {
773781
<-dq.ReadChan()
774782
}
775783
}
784+
785+
func TestDiskQueueRollAsync(t *testing.T) {
786+
l := NewTestLogger(t)
787+
dqName := "test_disk_queue_roll" + strconv.Itoa(int(time.Now().Unix()))
788+
tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano()))
789+
if err != nil {
790+
panic(err)
791+
}
792+
defer os.RemoveAll(tmpDir)
793+
msg := []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 0}
794+
ml := int64(len(msg))
795+
dq := New(dqName, tmpDir, 10*(ml+4), int32(ml), 1<<10, 2500, 2*time.Second, l)
796+
defer dq.Close()
797+
NotNil(t, dq)
798+
Equal(t, int64(0), dq.Depth())
799+
800+
for i := 0; i < 11; i++ {
801+
err := dq.Put(msg)
802+
Nil(t, err)
803+
Equal(t, int64(1), dq.Depth())
804+
805+
Equal(t, msg, <-dq.ReadChan())
806+
Equal(t, int64(0), dq.Depth())
807+
}
808+
809+
Equal(t, int64(1), dq.(*diskQueue).writeFileNum)
810+
Equal(t, int64(ml+4), dq.(*diskQueue).writePos)
811+
812+
filepath.Walk(tmpDir, func(path string, info fs.FileInfo, err error) error {
813+
if strings.HasSuffix(path, ".bad") {
814+
t.FailNow()
815+
}
816+
817+
return err
818+
})
819+
}
820+
821+
func TestWriteRollReadEOF(t *testing.T) {
822+
l := NewTestLogger(t)
823+
dqName := "test_disk_queue_roll_readEOF" + strconv.Itoa(int(time.Now().Unix()))
824+
tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano()))
825+
if err != nil {
826+
panic(err)
827+
}
828+
defer os.RemoveAll(tmpDir)
829+
dq := New(dqName, tmpDir, 1024, 4, 1<<10, 2500, 2*time.Second, l)
830+
defer dq.Close()
831+
NotNil(t, dq)
832+
Equal(t, int64(0), dq.Depth())
833+
834+
for i := 0; i < 205; i++ { // 204 messages fit, but message 205 will be too big
835+
msg := []byte(fmt.Sprintf("%05d", i)) // 5 bytes
836+
err = dq.Put(msg)
837+
838+
msgOut := <-dq.ReadChan()
839+
Equal(t, msg, msgOut)
840+
}
841+
842+
filepath.Walk(tmpDir, func(path string, info fs.FileInfo, err error) error {
843+
if strings.HasSuffix(path, ".bad") {
844+
t.FailNow()
845+
}
846+
847+
return err
848+
})
849+
}

0 commit comments

Comments
 (0)