compact的作用
DB有一个后台线程负责将memtable持久化成sstable,以及均衡整个DB各个level层的sstable。compact分为minor compaction和major compaction。memtable持久化成sstable称为minor compaction,level(n)和level(n+1)之间某些sstable的merge称为major compaction。
compact的时机
compact的种类
Major Compaction
指的是 immutable memtable持久化为 sst 文件
主要流程:
-
将内存中的memsstable格式化成sst文件的格式;
-
选择这个新sst文件放置的level,规则如图 2 所示(来自文献 [2]);
-
将新sst文件放置到第2步选出的level中。
Major Compaction
指的是 sst 文件之间的 compaction
主要分为:
-
Manual Compaction,是人工触发的Compaction,由外部接口调用产生
-
Size Compaction,是根据每个level的总文件大小来触发
第1步:计算的score值,可以得出 max score,从而得出了应该哪一个 level 上进行 Compact,
第2步:假设上面选出的是 level n,那么第 2 步就是选择出需要 Compact 的文件,其包含两步,首先在 level n 中选出需要 Compact 的文件文件(对应第2.1步);然后根据level n选出的文件的key的begin和end来选出 level n+1 层的 sst 文件(对应第2.2步):
第2.1步:确定level n参与Compact的文件列表
2.1.1: 将begin key更新为level n 上次Compact操作的文件的largest key。然后顺序查找level的sst文件,返回第一个largest key > begin key的sst文件,并加入到level n需要Compact的文件列表中;
2.1.2: 如果是n==0,把sst文件都检查一遍,如果存在重叠则加入Compact文件列表中。因为level 0中,所有的文件之间都有可能存在重叠(overlap)。
第2.2步:确定level n+1参与Compact的文件列表;
2.2.1: 计算出level n参与Compact的文件列表的所有sst文件的总和key范围的begin和end;
2.2.2: 根据2.2.1计算出来的begin和end,去获取根level n+1有重叠(overlap)的sst文件列表;
2.2.3: 计算当前的level n 和 n+1参与Compact的两个文件列表的总和,如果小于阈值kExpandedCompactionByteSizeLimit=50M,那么会继续尝试在level n中选择出合适的sst文件,考虑到不影响理解,具体细节暂时省略。
-
Seek Compaction,每个文件的 seek miss 次数都有一个阈值,如果超过了这个阈值,那么认为这个文件需要Compact
在levelDB中,每一个新的sst文件,都有一个 allowed_seek 的初始阈值,表示最多容忍 seek miss 多少次,每个调用 Get seek miss 的时候,就会执行减1(allowed_seek–)。其中 allowed_seek 的初始阈值的计算方式为:
allowed_seeks = (sst文件的file size / 16384); // 16348——16kb if ( allowed_seeks < 100 ) allowed_seeks = 100; LevelDB认为如果一个 sst 文件在 level i 中总是没总到,而是在 level i+1 中找到,那么当这种 seek miss 积累到一定次数之后,就考虑将其从 level i 中合并到 level i+1 中,这样可以避免不必要的 seek miss 消耗 read I/O。当然在引入布隆过滤器后,这种查找消耗的 IO 就会变小很多。
执行条件 当 allowed_seeks 递减到小于0了,那么将标记为需要Compact的文件。但是由于Size Compaction的优先级高于Seek Compaction,所以在不存在Size Compaction的时候,且触发了Compaction,那么Seek Compaction就能执行。
核心过程 计算 sst 的 allowed_seek 都是在 sst 刚开始新建的时候完成;而每次 Get(key)操作都会更新 allowed_seek,当allowed_seeks 递减到小于0了,那么将标记为需要 Compact 的文件。
compact的具体实现
func openDB(s *session) (*DB, error) {
...
if readOnly {
db.SetReadOnly()
} else {
db.closeW.Add(2)
go db.tCompaction()
go db.mCompaction()
// go db.jWriter()
}
...
}
- mCompaction对应着将immutable持久化成sstable
- tCompaction则是对sstable之间的compact
mCompaction
func (db *DB) mCompaction() {
...
for {
select {
case x = <-db.mcompCmdC:
switch x.(type) {
case cAuto:
db.memCompaction()
x.ack(nil)
x = nil
default:
panic("leveldb: unknown command")
}
case <-db.closeC:
return
}
}
}
- mCompaction工作在一个独立的协程中,接收mcompCmdC命令,执行memCompaction操作
memCompaction
func (db *DB) memCompaction() {
// 拿到immutable
mdb := db.getFrozenMem()
...
// 开启一个 "memdb@flush" 事
// Generate tables.
db.compactionTransactFunc("memdb@flush", func(cnt *compactionTransactCounter) (err error) {
stats.startTimer()
// 在事务中处理memdb
flushLevel, err = db.s.flushMemdb(rec, mdb.DB, db.memdbMaxLevel)
stats.stopTimer()
return
}, func() error {
for _, r := range rec.addedTables {
db.logf("memdb@flush revert @%d", r.num)
if err := db.s.stor.Remove(storage.FileDesc{Type: storage.TypeTable, Num: r.num}); err != nil {
return err
}
}
return nil
})
...
}
compactionTransactFunc
func (db *DB) compactionTransactFunc(name string, run func(cnt *compactionTransactCounter) error, revert func() error) {
db.compactionTransact(name, &compactionTransactFunc{run, revert})
}
- 定义了事务的执行方法run以及事务的恢复方法revert
flushMemdb
func (s *session) flushMemdb(rec *sessionRecord, mdb *memdb.DB, maxLevel int) (int, error) {
// Create sorted table.
iter := mdb.NewIterator(nil)
defer iter.Release()
t, n, err := s.tops.createFrom(iter)
if err != nil {
return 0, err
}
// 生成的sstable需要被放入哪一层有一套判断方式
flushLevel := s.pickMemdbLevel(t.imin.ukey(), t.imax.ukey(), maxLevel)
rec.addTableFile(flushLevel, t)
s.logf("memdb@flush created L%d@%d N·%d S·%s %q:%q", flushLevel, t.fd.Num, n, shortenb(int(t.size)), t.imin, t.imax)
return flushLevel, nil
}
pickMemdbLevel
func (v *version) pickMemdbLevel(umin, umax []byte, maxLevel int) (level int) {
if maxLevel > 0 {
if len(v.levels) == 0 {
return maxLevel
}
if !v.levels[0].overlaps(v.s.icmp, umin, umax, true) {
var overlaps tFiles
for ; level < maxLevel; level++ {
if pLevel := level + 1; pLevel >= len(v.levels) {
return maxLevel
} else if v.levels[pLevel].overlaps(v.s.icmp, umin, umax, false) {
break
}
if gpLevel := level + 2; gpLevel < len(v.levels) {
overlaps = v.levels[gpLevel].getOverlaps(overlaps, v.s.icmp, umin, umax, false)
if overlaps.size() > int64(v.s.o.GetCompactionGPOverlaps(level)) {
break
}
}
}
}
}
return
}
tCompaction
func (db *DB) tCompaction() {
var (
x cCmd
waitQ []cCmd
)
...
for {
...
if x != nil {
switch cmd := x.(type) {
case cAuto:
if cmd.ackC != nil {
// Check the write pause state before caching it.
if db.resumeWrite() {
x.ack(nil)
} else {
waitQ = append(waitQ, x)
}
}
case cRange:
x.ack(db.tableRangeCompaction(cmd.level, cmd.min, cmd.max))
default:
panic("leveldb: unknown command")
}
x = nil
}
db.tableAutoCompaction()
}
}
tableRangeCompaction
func (db *DB) tableRangeCompaction(level int, umin, umax []byte) error {
db.logf("table@compaction range L%d %q:%q", level, umin, umax)
if level >= 0 {
if c := db.s.getCompactionRange(level, umin, umax, true); c != nil {
db.tableCompaction(c, true)
}
} else {
// 循环直到没有内容需要合并
// Retry until nothing to compact.
for {
compacted := false
// Scan for maximum level with overlapped tables.
v := db.s.version()
m := 1
for i := m; i < len(v.levels); i++ {
tables := v.levels[i]
if tables.overlaps(db.s.icmp, umin, umax, false) {
m = i
}
}
v.release()
for level := 0; level < m; level++ {
if c := db.s.getCompactionRange(level, umin, umax, false); c != nil {
db.tableCompaction(c, true)
compacted = true
}
}
if !compacted {
break
}
}
}
return nil
}
tableCompaction
func (db *DB) tableCompaction(c *compaction, noTrivial bool) {
defer c.release()
rec := &sessionRecord{}
rec.addCompPtr(c.sourceLevel, c.imax)
if !noTrivial && c.trivial() {
t := c.levels[0][0]
db.logf("table@move L%d@%d -> L%d", c.sourceLevel, t.fd.Num, c.sourceLevel+1)
rec.delTable(c.sourceLevel, t.fd.Num)
rec.addTableFile(c.sourceLevel+1, t)
db.compactionCommit("table-move", rec)
return
}
var stats [2]cStatStaging
for i, tables := range c.levels {
for _, t := range tables {
stats[i].read += t.size
// Insert deleted tables into record
rec.delTable(c.sourceLevel+i, t.fd.Num)
}
}
sourceSize := int(stats[0].read + stats[1].read)
minSeq := db.minSeq()
db.logf("table@compaction L%d·%d -> L%d·%d S·%s Q·%d", c.sourceLevel, len(c.levels[0]), c.sourceLevel+1, len(c.levels[1]), shortenb(sourceSize), minSeq)
b := &tableCompactionBuilder{
db: db,
s: db.s,
c: c,
rec: rec,
stat1: &stats[1],
minSeq: minSeq,
strict: db.s.o.GetStrict(opt.StrictCompaction),
tableSize: db.s.o.GetCompactionTableSize(c.sourceLevel + 1),
}
db.compactionTransact("table@build", b)
// Commit.
stats[1].startTimer()
db.compactionCommit("table", rec)
stats[1].stopTimer()
resultSize := int(stats[1].write)
db.logf("table@compaction committed F%s S%s Ke·%d D·%d T·%v", sint(len(rec.addedTables)-len(rec.deletedTables)), sshortenb(resultSize-sourceSize), b.kerrCnt, b.dropCnt, stats[1].duration)
// Save compaction stats
for i := range stats {
db.compStats.addStat(c.sourceLevel+1, &stats[i])
}
switch c.typ {
case level0Compaction:
atomic.AddUint32(&db.level0Comp, 1)
case nonLevel0Compaction:
atomic.AddUint32(&db.nonLevel0Comp, 1)
case seekCompaction:
atomic.AddUint32(&db.seekComp, 1)
}
}