Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 71 additions & 38 deletions cmd/index_analyzer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@ import (
"fmt"
"hash/fnv"
"os"
"strings"
"sync"
"time"

"github.com/alecthomas/units"
"go.uber.org/zap"

"github.com/ozontech/seq-db/consts"
"github.com/ozontech/seq-db/frac/sealed"
"github.com/ozontech/seq-db/frac/sealed/lids"
"github.com/ozontech/seq-db/frac/sealed/token"
Expand All @@ -21,7 +23,7 @@ import (

// Launch as:
//
// > go run ./cmd/index_analyzer/... ./data/*.index | tee ~/report.txt
// > go run ./cmd/index_analyzer/... ./data/*.info | tee ~/report.txt
func main() {
if len(os.Args) < 2 {
fmt.Println("No args")
Expand Down Expand Up @@ -73,45 +75,80 @@ func getCacheMaintainer() (*fracmanager.CacheMaintainer, func()) {
}
}

// basePath strips any known index suffix to return the fraction base path.
func basePath(path string) string {
for _, suffix := range []string{
consts.InfoFileSuffix,
consts.TokenFileSuffix,
consts.OffsetsFileSuffix,
consts.IDFileSuffix,
consts.LIDFileSuffix,
} {
if strings.HasSuffix(path, suffix) {
return path[:len(path)-len(suffix)]
}
}
return path
}

func openFile(path string) *os.File {
f, err := os.Open(path)
if err != nil {
panic(err)
}
return f
}

func analyzeIndex(
path string,
cm *fracmanager.CacheMaintainer,
reader *storage.ReadLimiter,
rl *storage.ReadLimiter,
mergedTokensUniq map[string]map[string]int,
allTokensValuesUniq map[string]int,
) Stats {
base := basePath(path)
indexCache := cm.CreateIndexCache()

// Open per-section files.
infoFile := openFile(base + consts.InfoFileSuffix)
tokenFile := openFile(base + consts.TokenFileSuffix)
lidFile := openFile(base + consts.LIDFileSuffix)
defer infoFile.Close()
defer tokenFile.Close()
defer lidFile.Close()

infoReader := storage.NewIndexReader(rl, infoFile.Name(), infoFile, indexCache.InfoRegistry)
tokenReader := storage.NewIndexReader(rl, tokenFile.Name(), tokenFile, indexCache.TokenRegistry)
lidReader := storage.NewIndexReader(rl, lidFile.Name(), lidFile, indexCache.LIDRegistry)

// --- Info ---
var blockIndex uint32
cache := cm.CreateIndexCache()

f, err := os.Open(path)
infoData, _, err := infoReader.ReadIndexBlock(0, nil)
if err != nil {
panic(err)
logger.Fatal("error reading info block", zap.String("file", infoFile.Name()), zap.Error(err))
}
var b sealed.BlockInfo
if err := b.Unpack(infoData); err != nil {
logger.Fatal("error unpacking block info", zap.Error(err))
}
docsCount := int(b.Info.DocsTotal)

indexReader := storage.NewIndexReader(reader, f.Name(), f, cache.Registry)

readBlock := func() []byte {
data, _, err := indexReader.ReadIndexBlock(blockIndex, nil)
// --- Tokens (.token file) ---
// Token blocks start at index 0, followed by an empty separator, then token table blocks.
blockIndex = 0
readTokenBlock := func() []byte {
data, _, err := tokenReader.ReadIndexBlock(blockIndex, nil)
blockIndex++
if err != nil {
logger.Fatal("error reading block", zap.String("file", f.Name()), zap.Error(err))
logger.Fatal("error reading token block", zap.String("file", tokenFile.Name()), zap.Error(err))
}
return data
}

// load info
var b sealed.BlockInfo
if err := b.Unpack(readBlock()); err != nil {
logger.Fatal("error unpacking block info", zap.Error(err))
}

docsCount := int(b.Info.DocsTotal)

// load tokens
tokens := [][]byte{}
for {
data := readBlock()
if len(data) == 0 { // empty block - is section separator
data := readTokenBlock()
if len(data) == 0 { // empty block - section separator
break
}
block := token.Block{}
Expand All @@ -123,11 +160,10 @@ func analyzeIndex(
}
}

// load tokens table
tokenTableBlocks := []token.TableBlock{}
for {
data := readBlock()
if len(data) == 0 { // empty block - is section separator
data := readTokenBlock()
if len(data) == 0 { // empty block - section separator
break
}
block := token.TableBlock{}
Expand All @@ -136,28 +172,25 @@ func analyzeIndex(
}
tokenTable := token.TableFromBlocks(tokenTableBlocks)

// skip position
blockIndex++

// skip IDS
for {
data := readBlock()
if len(data) == 0 { // empty block - is section separator
break
// --- LIDs (.lid file) ---
blockIndex = 0
readLIDBlock := func() []byte {
data, _, err := lidReader.ReadIndexBlock(blockIndex, nil)
blockIndex++
if err != nil {
logger.Fatal("error reading lid block", zap.String("file", lidFile.Name()), zap.Error(err))
}
blockIndex++ // skip RID
blockIndex++ // skip Param
return data
}

// load LIDs
tid := 0
lidsTotal := 0
lidsUniq := map[[16]byte]int{}
lidsLens := make([]int, len(tokens))
tokenLIDs := []uint32{}
for {
data := readBlock()
if len(data) == 0 { // empty block - is section separator
data := readLIDBlock()
if len(data) == 0 { // empty block - section separator
break
}

Expand Down
18 changes: 18 additions & 0 deletions consts/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,26 @@ const (
SdocsTmpFileSuffix = "._sdocs"
SdocsDelFileSuffix = ".sdocs.del"

InfoFileSuffix = ".info"
InfoTmpFileSuffix = "._info"

TokenFileSuffix = ".tokens"
TokenTmpFileSuffix = "._tokens"

OffsetsFileSuffix = ".offsets"
OffsetsTmpFileSuffix = "._offsets"

IDFileSuffix = ".ids"
IDTmpFileSuffix = "._ids"

LIDFileSuffix = ".lids"
LIDTmpFileSuffix = "._lids"

// IndexFileSuffix is the legacy single-file index format (pre-split).
IndexFileSuffix = ".index"
IndexTmpFileSuffix = "._index"
// TODO(dkharms): [IndexDelFileSuffix] is actually not necessary.
// We can remove it in the future releases.
IndexDelFileSuffix = ".index.del"

RemoteFractionSuffix = ".remote"
Expand Down
22 changes: 5 additions & 17 deletions frac/active.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package frac
import (
"context"
"io"
"math"
"os"
"path/filepath"
"sync"
Expand All @@ -26,9 +25,7 @@ import (
"github.com/ozontech/seq-db/util"
)

var (
_ Fraction = (*Active)(nil)
)
var _ Fraction = (*Active)(nil)

type Active struct {
Config *Config
Expand Down Expand Up @@ -64,16 +61,6 @@ type Active struct {
skipMaskProvider skipMaskProvider
}

const (
systemMID = math.MaxUint64
systemRID = math.MaxUint64
)

var systemSeqID = seq.ID{
MID: systemMID,
RID: systemRID,
}

func NewActive(
baseFileName string,
activeIndexer *ActiveIndexer,
Expand Down Expand Up @@ -116,8 +103,8 @@ func NewActive(
}

// use of 0 as keys in maps is prohibited – it's system key, so add first element
f.MIDs.Append(systemMID)
f.RIDs.Append(systemRID)
f.MIDs.Append(uint64(seq.SystemMID))
f.RIDs.Append(uint64(seq.SystemRID))

logger.Info("active fraction created", zap.String("fraction", baseFileName))

Expand All @@ -128,7 +115,8 @@ func mustOpenMetaWriter(
baseFileName string,
readLimiter *storage.ReadLimiter,
docsFile *os.File,
docsStats os.FileInfo) (*os.File, *ActiveWriter, *storage.DocBlocksReader, *storage.WalReader, uint64) {
docsStats os.FileInfo,
) (*os.File, *ActiveWriter, *storage.DocBlocksReader, *storage.WalReader, uint64) {
legacyMetaFileName := baseFileName + consts.MetaFileSuffix

if _, err := os.Stat(legacyMetaFileName); err == nil {
Expand Down
7 changes: 6 additions & 1 deletion frac/active_lids.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,18 @@ func (tl *TokenLIDs) GetLIDs(mids, rids *UInt64s) []uint32 {
return tl.sorted
}

// SortedLIDs returns pre-merged LIDs.
// Only safe to call after the fraction is frozen and lids queue was drained.
func (tl *TokenLIDs) SortedLIDsUnsafe() []uint32 {
return tl.sorted
}

type SeqIDCmp struct {
mid []uint64
rid []uint64
}

func (c *SeqIDCmp) compare(a, b uint32) int {

midA, midB := c.mid[a], c.mid[b]

if midA > midB {
Expand Down
Loading
Loading