Badger源码导读(一) – DB初始化

Badger源码导读

源码分析入口基准案例

先从Badger的基本使用入手

func main() {
    // 打开db
    db, _ := badger.Open(badger.DefaultOptions("tmp/badger"))
    defer db.Close()

    // 读写事务
    err := db.Update(func(txn *badger.Txn) error {
        txn.Set([]byte("answer"), []byte("42"))
        txn.Get([]byte("answer"))
        return nil
    })

    // 只读事务
    err = db.View(func(txn *badger.Txn) error {
        txn.Get([]byte("answer_v1"))
        return nil
    })

    // 遍历keys
    err = db.View(func(txn *badger.Txn) error {
        opts := badger.DefaultIteratorOptions
        opts.PrefetchSize = 10
        it := txn.NewIterator(opts)
        defer it.Close()
        for it.Rewind(); it.Valid(); it.Next() {
            item := it.Item()
            k := item.Key()
            err := item.Value(func(val []byte) error {
                fmt.Printf("key=%s, value=%s\n", k, val)
                return nil
            })
            if err != nil {
                return err
            }
        }
        return nil
    })
    err = db.RunValueLogGC(0.7)
    _ = err
}

DB初始化过程

初始化参数

badger.open()传入的是一个option,先看一下option结构体的字段都有哪些

type Options struct {
    // Required options.

    // Dir: Badger是KV分离的存储引擎,Dir位置存储的是 Key 和指向Value的逻辑指针
    // ValueDir: 存储的是Value日志,即值所在的地址,默认情况下Dir和ValueDir在同一个path目录下
    Dir      string
    ValueDir string

    // Usually modified options.

    // SyncWrites: 同步写,即写入的时候主动同步到磁盘(mmap不会立即刷盘)
    SyncWrites        bool
    NumVersionsToKeep int
    // ReadOnly: 如其名,是否设置为只读
    ReadOnly          bool
    // Logger: 如其名,log对象
    Logger            Logger
    // Compression: 压缩归并的级别
    Compression       options.CompressionType
    // InMemory: 是否只基于内存
    InMemory          bool
    MetricsEnabled    bool
    // Sets the Stream.numGo field
    NumGoroutines int

    // Fine tuning options.

    // MemTableSize: 内存表的尺寸限制
    MemTableSize        int64
    BaseTableSize       int64
    BaseLevelSize       int64
    LevelSizeMultiplier int
    TableSizeMultiplier int
    // MaxLevels: 最大容忍的level级别,LSM-T的级数L0-L(max-1)
    MaxLevels           int

    VLogPercentile float64
    // ValueThreshold: 值大小的阈值,如果Value的大小不超过这个设定值,则不会将KV进行分离
    // 此处是在工业实践中的一种权衡,KV分离会造成不可避免的读放大
    // (两次的随机读,先在LSM-T中读取一次指针,再通过指针从ValueLog中读取一次值)
    ValueThreshold int64
    // NumMemtables: 内存表的数量
    NumMemtables   int
    // Changing BlockSize across DB runs will not break badger. The block size is
    // read from the block index stored at the end of the table.
    // BlockSize: 每个block的大小(sst由block和index等组成)
    BlockSize          int
    // BloomFalsePositive: 布隆过滤器假阳性的比例
    BloomFalsePositive float64
    // BlockCacheSize: 块缓存的大小
    BlockCacheSize     int64
    // IndexCacheSize: 索引缓存的大小
    IndexCacheSize     int64

    NumLevelZeroTables      int
    NumLevelZeroTablesStall int

    // ValueLogFileSize: 存储值的Valuelog文件的最大大小
    ValueLogFileSize   int64
    // ValueLogMaxEntries: 存储值的Valuelog文件的最大键值对数量
    ValueLogMaxEntries uint32

    // NumCompactors: 日志合并压缩协程同时运行的最大数量
    NumCompactors        int
    CompactL0OnClose     bool
    LmaxCompaction       bool
    ZSTDCompressionLevel int

    // When set, checksum will be validated for each entry read from the value log file.
    // VerifyValueChecksum: 是否进行参数校验值的检查
    VerifyValueChecksum bool

    // Encryption related options.
    // EncryptionKey: 加密字段 
    EncryptionKey                 []byte        // encryption key
    // EncryptionKeyRotationDuration: 加密字段有效时长
    EncryptionKeyRotationDuration time.Duration // key rotation duration

    // BypassLockGuard will bypass the lock guard on badger. Bypassing lock
    // guard can cause data corruption if multiple badger instances are using
    // the same directory. Use this options with caution.
    BypassLockGuard bool

    // ChecksumVerificationMode decides when db should verify checksums for SSTable blocks.
    ChecksumVerificationMode options.ChecksumVerificationMode

    // DetectConflicts determines whether the transactions would be checked for
    // conflicts. The transactions can be processed at a higher rate when
    // conflict detection is disabled.
    // DetectConflicts: 事务的冲突检测 
    DetectConflicts bool

    // NamespaceOffset specifies the offset from where the next 8 bytes contains the namespace.
    NamespaceOffset int

    // Transaction start and commit timestamps are managed by end-user.
    // This is only useful for databases built on top of Badger (like Dgraph).
    // Not recommended for most users.
    managedTxns bool

    // 4. Flags for testing purposes
    // ------------------------------
    // 有关批处理的参数
    maxBatchCount int64 // max entries in batch
    maxBatchSize  int64 // max batch size in bytes

    maxValueThreshold float64
}

传入指定的路径,并默认配置信息,如果有需要更改的信息可以使用 WithX() 方法(此处使用了建造者模式

badger.DefaultOptions("tmp/badger")

// DefaultOptions sets a list of recommended options for good performance.
// Feel free to modify these to suit your needs with the WithX methods.
func DefaultOptions(path string) Options {
    return Options{

        Dir:      path,
        ValueDir: path,

        MemTableSize:        64 << 20,
        BaseTableSize:       2 << 20,
        BaseLevelSize:       10 << 20,
        TableSizeMultiplier: 2,
        LevelSizeMultiplier: 10,
        MaxLevels:           7,
        NumGoroutines:       8,
        MetricsEnabled:      true,

        NumCompactors:           4, // Run at least 2 compactors. Zero-th compactor prioritizes L0.
        NumLevelZeroTables:      5,
        NumLevelZeroTablesStall: 15,
        NumMemtables:            5,
        BloomFalsePositive:      0.01,
        BlockSize:               4 * 1024,
        SyncWrites:              false,
        NumVersionsToKeep:       1,
        CompactL0OnClose:        false,
        VerifyValueChecksum:     false,
        Compression:             options.Snappy,
        BlockCacheSize:          256 << 20,
        IndexCacheSize:          0,

        // The following benchmarks were done on a 4 KB block size (default block size). The
        // compression is ratio supposed to increase with increasing compression level but since the
        // input for compression algorithm is small (4 KB), we don't get significant benefit at
        // level 3.
        // NOTE: The benchmarks are with DataDog ZSTD that requires CGO. Hence, no longer valid.
        // no_compression-16              10     502848865 ns/op     165.46 MB/s    -
        // zstd_compression/level_1-16     7     739037966 ns/op     112.58 MB/s    2.93
        // zstd_compression/level_3-16     7     756950250 ns/op     109.91 MB/s    2.72
        // zstd_compression/level_15-16    1    11135686219 ns/op      7.47 MB/s    4.38
        // Benchmark code can be found in table/builder_test.go file
        ZSTDCompressionLevel: 1,

        // Nothing to read/write value log using standard File I/O
        // MemoryMap to mmap() the value log files
        // (2^30 - 1)*2 when mmapping < 2^31 - 1, max int32.
        // -1 so 2*ValueLogFileSize won't overflow on 32-bit systems.
        ValueLogFileSize: 1<<30 - 1,

        ValueLogMaxEntries: 1000000,

        VLogPercentile: 0.0,
        ValueThreshold: maxValueThreshold,

        Logger:                        defaultLogger(INFO),
        EncryptionKey:                 []byte{},
        EncryptionKeyRotationDuration: 10 * 24 * time.Hour, // Default 10 days.
        DetectConflicts:               true,
        NamespaceOffset:               -1,
    }
}

Open函数(核心)

badger.Open(opt) 函数

此方法代码过长,在此只保留核心部分代码,部分逻辑将以伪代码或注释表示,并省去部分错误处理逻辑

func Open(opt Options) (*DB, error) {
// 检查参数
checkAndSetOptions(&opt)
// 创建了三个目录锁,防止其他进程注册到同一个目录造成冲突
var dirLockGuard, valueDirLockGuard *directoryLockGuard
// Create directories and acquire lock on it only if badger is not running in InMemory mode.
// We don't have any directories/files in InMemory mode so we don't need to acquire
// any locks on them.
// 判断参数配置为只基于内存
if !opt.InMemory {
// 创建目录
createDirs(opt)
var err error
if !opt.BypassLockGuard {
// 给Dir加目录锁
dirLockGuard, _ = acquireDirectoryLock(opt.Dir, lockFile, opt.ReadOnly)
// 方法末尾释放锁
defer func() {
if dirLockGuard != nil {
_ = dirLockGuard.release()
}
}()
// 获取Key&ValuePtr的绝对路径
absDir, _ := filepath.Abs(opt.Dir)
// 获取ValueLog的绝对路径
absValueDir, _ := filepath.Abs(opt.ValueDir)
// 如果ValueDir和Dir不相同,需要各自加锁
if absValueDir != absDir {
// 给ValueDir加目录锁
valueDirLockGuard, _ = acquireDirectoryLock(opt.ValueDir, lockFile, opt.ReadOnly)
// 释放锁
defer func() {
if valueDirLockGuard != nil {
_ = valueDirLockGuard.release()
}
}()
}
}
}
// 打开或创建Manifest文件,(采用mmap方式打开,在后面详细展开)
manifestFile, manifest, _ := openOrCreateManifestFile(opt)
// 关闭Manifest文件
defer func() {
if manifestFile != nil {
_ = manifestFile.close()
}
}()
// 创建内存中的db数据结构
db := &DB{
// memtable, 因为有多个,所以要创建数组
imm:              make([]*memTable, 0, opt.NumMemtables),
// 刷新请求的channel
flushChan:        make(chan flushTask, opt.NumMemtables),
// 写请求的channel
writeCh:          make(chan *request, kvWriteChCapacity),
// 配置信息opt
opt:              opt,
// 刚初始化好的manifest实例
manifest:         manifestFile,
// Key&ValuePtr目录锁
dirLockGuard:     dirLockGuard,
// Value目录锁
valueDirGuard:    valueDirLockGuard,
// Oracle的实例,一个KV引擎并发事务的管理器,负责分配事务的版本号,用来实现MVCC功能,在读写事务时详细展开
orc:              newOracle(opt),
pub:              newPublisher(),
allocPool:        z.NewAllocatorPool(8),
bannedNamespaces: &lockedKeys{keys: make(map[uint64]struct{})},
threshold:        initVlogThreshold(&opt),
}
// Cleanup all the goroutines started by badger in case of an error.
// 关闭badger的所有任务协程的钩子函数
defer func() {
if err != nil {
opt.Errorf("Received err: %v. Cleaning up...", err)
db.cleanup()
db = nil
}
}()
// 块缓存相关配置
// LSM-T结构中SST里面数据是以块(block)为单位分割的
// 当开启块缓存之后,LSM-T会把最近被访问到的高热的块缓存在内存中,以加块响应速度
if opt.BlockCacheSize > 0 {
// 缓存不在此次源码阅读的讨论范围之内,不影响核心功能,暂且略过
// 值得一提的是badger是使用的缓存是badger社区研发的一个高性能本地并发缓存的库,有兴趣的同学可以自行研究
numInCache := opt.BlockCacheSize / int64(opt.BlockSize)
if numInCache == 0 {
// Make the value of this variable at least one since the cache requires
// the number of counters to be greater than zero.
numInCache = 1
}
config := ristretto.Config{
NumCounters: numInCache * 8,
MaxCost:     opt.BlockCacheSize,
BufferItems: 64,
Metrics:     true,
OnExit:      table.BlockEvictHandler,
}
db.blockCache, err = ristretto.NewCache(&config)
if err != nil {
return nil, y.Wrap(err, "failed to create data cache")
}
}
// 索引缓存相关配置
// 索引是每个Key所对应的偏离量的值,每一个SSTable有一个元数据块即索引块
// 可以方便对Key的二分查找,定位当前的key在哪一个sstable文件里,在文件中的偏移量是多少
if opt.IndexCacheSize > 0 {
// Index size is around 5% of the table size.
indexSz := int64(float64(opt.MemTableSize) * 0.05)
numInCache := opt.IndexCacheSize / indexSz
if numInCache == 0 {
// Make the value of this variable at least one since the cache requires
// the number of counters to be greater than zero.
numInCache = 1
}
config := ristretto.Config{
NumCounters: numInCache * 8,
MaxCost:     opt.IndexCacheSize,
BufferItems: 64,
Metrics:     true,
}
db.indexCache, err = ristretto.NewCache(&config)
if err != nil {
return nil, y.Wrap(err, "failed to create bf cache")
}
}
// 对缓存模块的监控检测
db.closers.cacheHealth = z.NewCloser(1)
go db.monitorCache(db.closers.cacheHealth)
// 如果仅基于内存
if db.opt.InMemory {
// 默认关闭写同步
db.opt.SyncWrites = false
// If badger is running in memory mode, push everything into the LSM Tree.
// 把所有数据只写在LSM-T中
db.opt.ValueThreshold = math.MaxInt32
}
// Key的注册,与并发事务相关,之后再详细展开
krOpt := KeyRegistryOptions{
ReadOnly:                      opt.ReadOnly,
Dir:                           opt.Dir,
EncryptionKey:                 opt.EncryptionKey,
EncryptionKeyRotationDuration: opt.EncryptionKeyRotationDuration,
InMemory:                      opt.InMemory,
}
db.registry, _ = OpenKeyRegistry(krOpt)
// 计算消耗的内存等数据统计信息
db.calculateSize()
db.closers.updateSize = z.NewCloser(1)
go db.updateSize(db.closers.updateSize)
// 打开一个memTable实例
// memtable是在内存中的一个复杂数据结构
if err := db.openMemTables(db.opt); err != nil {
return nil, y.Wrapf(err, "while opening memtables")
}
// 检查
if !db.opt.ReadOnly {
// 创建一个新的.mem文件
// .mem文件就是LSM-T中的预写日志文件(wal)
if db.mt, err = db.newMemTable(); err != nil {
return nil, y.Wrapf(err, "cannot create memtable")
}
}
// newLevelsController potentially loads files in directory.
// 创建内存中level管理器
// LSM-T是分层结构的, LevelsController实例负责维护整个层级结构
// 进行日志归并,压缩处理等操作,通过Manifest进行初始配置
// 或者是,manifest文件就是LevelController持久化之后的ondisk版本,可以加快badger的恢复重启速度
// 先打开SSTable,加载索引块,元数据块,缓存到内存当中
if db.lc, err = newLevelsController(db, &manifest); err != nil {
return db, err
}
// Initialize vlog struct.
// 初始化vlog
db.vlog.init(db)
if !opt.ReadOnly {
// 启动日志归并的工作协程,后续再展开
db.closers.compactors = z.NewCloser(1)
db.lc.startCompact(db.closers.compactors)
db.closers.memtable = z.NewCloser(1)
go func() {
_ = db.flushMemtable(db.closers.memtable) // Need levels controller to be up.
}()
// Flush them to disk asap.
for _, mt := range db.imm {
db.flushChan <- flushTask{mt: mt}
}
}
// We do increment nextTxnTs below. So, no need to do it here.
// 拿到启动时最大事务的版本号(时间戳)
db.orc.nextTxnTs = db.MaxVersion()
db.opt.Infof("Set nextTxnTs to %d", db.orc.nextTxnTs)
// 真正打开vlog文件
if err = db.vlog.open(db); err != nil {
return db, y.Wrapf(err, "During db.vlog.open")
}
// Let's advance nextTxnTs to one more than whatever we observed via
// replaying the logs.
// 事务相关,等待之前事务的恢复
db.orc.txnMark.Done(db.orc.nextTxnTs)
// In normal mode, we must update readMark so older versions of keys can be removed during
// compaction when run in offline mode via the flatten tool.
db.orc.readMark.Done(db.orc.nextTxnTs)
// 事务号自增
db.orc.incrementNextTs()
// 监听配置信息的更改
go db.threshold.listenForValueThresholdUpdate()
// 从数据库中检索被禁止的命名空间并更新内存结构(非重点)
if err := db.initBannedNamespaces(); err != nil {
return db, errors.Wrapf(err, "While setting banned keys")
}
// 启动处理磁盘写请求的协程
// badger的写任务是并发写任务,可以充分发挥ssd的性能
db.closers.writes = z.NewCloser(1)
go db.doWrites(db.closers.writes)
if !db.opt.InMemory {
// 真正开启vlog的GC, 后面再详细讲解
db.closers.valueGC = z.NewCloser(1)
go db.vlog.waitOnGC(db.closers.valueGC)
}
// 监听协程(非重点)
db.closers.pub = z.NewCloser(1)
go db.pub.listenForUpdates(db.closers.pub)
// 释放锁
valueDirLockGuard = nil
dirLockGuard = nil
manifestFile = nil
// 返回db
return db, nil
}

创建Manifest文件

openOrCreateManifestFile(opt) 函数

func openOrCreateManifestFile(opt Options) (ret *manifestFile, result Manifest, err error) {
// 如果Inmemory则返回空的Manifest
if opt.InMemory {
return &manifestFile{inMemory: true}, Manifest{}, nil
}
return helpOpenOrCreateManifestFile(opt.Dir, opt.ReadOnly, manifestDeletionsRewriteThreshold)
}
func helpOpenOrCreateManifestFile(dir string, readOnly bool, deletionsThreshold int) (*manifestFile, Manifest, error) {
// 拼接path
path := filepath.Join(dir, ManifestFilename)
var flags y.Flags
if readOnly {
flags |= y.ReadOnly
}
// 尝试打开文件
fp, err := y.OpenExistingFile(path, flags) // We explicitly sync in addChanges, outside the lock.
if err != nil {
// 校验文件是否存在
if !os.IsNotExist(err) {
return nil, Manifest{}, err
}
// 如果仅读则无法创建直接返回
if readOnly {
return nil, Manifest{}, fmt.Errorf("no manifest found, required for read-only db")
}
// 真正创建manifest实例
m := createManifest()
// 覆盖写,执行完此条语句后就可以在目录中看到MANIFEST文件存在了(此时MANIFEST文件中仅有魔数bdg)
fp, netCreations, _ := helpRewrite(dir, &m)
// 断言,确保创建成功
y.AssertTrue(netCreations == 0)
// 创建manifestFile实例在内存中保存信息
mf := &manifestFile{
fp:                        fp,
directory:                 dir,
manifest:                  m.clone(),
deletionsRewriteThreshold: deletionsThreshold,
}
return mf, m, nil
}
// 文件存在加载恢复的逻辑暂不展开
......
}
func createManifest() Manifest {
levels := make([]levelManifest, 0)
return Manifest{
Levels: levels,
Tables: make(map[uint64]TableManifest),
}
// Tables: map[uint64]TableManifest
// uint64: 行号,第n个level
}

打开Memtable

memtable 结构体

// memTable structure stores a skiplist and a corresponding WAL. Writes to memTable are written
// both to the WAL and the skiplist. On a crash, the WAL is replayed to bring the skiplist back to
// its pre-crash form.
type memTable struct {
sl         *skl.Skiplist
wal        *logFile
maxVersion uint64
opt        Options
buf        *bytes.Buffer
}

openMemTables(opt) 方法

func (db *DB) openMemTables(opt Options) error {
// We don't need to open any tables in in-memory mode.
// 如果是只基于内存则直接返回(那我走?)
if db.opt.InMemory {
return nil
}
// 读取目录中的全部文件
files, _ := ioutil.ReadDir(db.opt.Dir)
var fids []int
// 遍历目录中的文件
for _, file := range files {
// 检查当前文件名是否包含一个.mem的后缀(在第一次初始化过程中肯定不会存在)
// 此时目录中应有的文件为 LOCK MANIFEST KEYREGISTRY
if !strings.HasSuffix(file.Name(), memFileExt) {
continue
}
// 如果有.mem文件,则取文件的命名转为int值作为fid
// 例: 000001.mem 000002.mem
fsz := len(file.Name())
fid, _ := strconv.ParseInt(file.Name()[:fsz-len(memFileExt)], 10, 64)
fids = append(fids, int(fid))
}
// Sort in ascending order.
// 按照fid排序
sort.Slice(fids, func(i, j int) bool {
return fids[i] < fids[j]
})
// 按照fid顺序遍历
for _, fid := range fids {
flags := os.O_RDWR
if db.opt.ReadOnly {
flags = os.O_RDONLY
}
// 真正的打开.mem文件,采用mmap方式加载.mem文件中的数据
mt, err := db.openMemTable(fid, flags)
if err != nil {
return y.Wrapf(err, "while opening fid: %d", fid)
}
// If this memtable is empty we don't need to add it. This is a
// memtable that was completely truncated.
if mt.sl.Empty() {
mt.DecrRef()
continue
}
// These should no longer be written to. So, make them part of the imm.
db.imm = append(db.imm, mt)
}
// 设置最新的fid序列号
if len(fids) != 0 {
db.nextMemFid = fids[len(fids)-1]
}
db.nextMemFid++
return nil
}

创建Memtable

newMemTable() 方法

func (db *DB) newMemTable() (*memTable, error) {
// 真正创建.mem文件
mt, err := db.openMemTable(db.nextMemFid, os.O_CREATE|os.O_RDWR)
if err == z.NewFile {
db.nextMemFid++
return mt, nil
}
if err != nil {
db.opt.Errorf("Got error: %v for id: %d\n", err, db.nextMemFid)
return nil, y.Wrapf(err, "newMemTable")
}
return nil, errors.Errorf("File %s already exists", mt.wal.Fd.Name())
}

openMemTable(fid, flags) 方法

func (db *DB) openMemTable(fid, flags int) (*memTable, error) {
// 拼接路径
filepath := db.mtFilePath(fid)
// 创建memtable中的skiplist
s := skl.NewSkiplist(arenaSize(db.opt))
// 创建memtable实例
mt := &memTable{
sl:  s,
opt: db.opt,
buf: &bytes.Buffer{},
}
// We don't need to create the wal for the skiplist in in-memory mode so return the mt.
// 如果只基于内存,则不需要创建wal文件,直接返回
if db.opt.InMemory {
return mt, z.NewFile
}
// 创建wal文件实例
mt.wal = &logFile{
fid:      uint32(fid),
path:     filepath,
registry: db.registry,
writeAt:  vlogHeaderSize,
opt:      db.opt,
}
// 调用系统函数创建wal文件
lerr := mt.wal.open(filepath, flags, 2*db.opt.MemTableSize)
// 如果未成功创建新文件或其他失败则返回err
if lerr != z.NewFile && lerr != nil {
return nil, y.Wrapf(lerr, "While opening memtable: %s", filepath)
}
// Have a callback set to delete WAL when skiplist reference count goes down to zero. That is,
// when it gets flushed to L0.
// 用来关闭的回调函数
s.OnClose = func() {
if err := mt.wal.Delete(); err != nil {
db.opt.Errorf("while deleting file: %s, err: %v", filepath, err)
}
}
// 成功创建mmap则返回 lerr (z.NewFile)
if lerr == z.NewFile {
return mt, lerr
}
// 当且仅当MemTableSize设置为0时造成 lerr == nil的适合执行到此
// 此时mmap未进行截断,在UpdateSkipList()中遍历wal文件并重新截断,如果wal文件不存在会返回错误
err := mt.UpdateSkipList()
return mt, y.Wrapf(err, "while updating skiplist")
}

创建levelController

newLevelsController(db, mf) 函数

func newLevelsController(db *DB, mf *Manifest) (*levelsController, error) {
// 断言,进行一些必要的校验
y.AssertTrue(db.opt.NumLevelZeroTablesStall > db.opt.NumLevelZeroTables)
// 关联db实例,创建level数组对应层级关系(例:levels[0] => L0层)
// levelHandler就是真正负责某一层sst管理器的主要操作
s := &levelsController{
kv:     db,
levels: make([]*levelHandler, db.opt.MaxLevels),
}
// 状态统计的一个的对象(set结构),key为fid,用以判断对应的fid是否存在于这一层
s.cstatus.tables = make(map[uint64]struct{})
// 合并状态的信息
s.cstatus.levels = make([]*levelCompactStatus, db.opt.MaxLevels)
// 按层遍历,每一层都创建一个levelhandler实例
for i := 0; i < db.opt.MaxLevels; i++ {
s.levels[i] = newLevelHandler(db, i)
s.cstatus.levels[i] = new(levelCompactStatus)
}
// 基于内存,那我走?🤡
if db.opt.InMemory {
return s, nil
}
// Compare manifest against directory, check for existent/non-existent files, and remove.
// 对manifest文件进行校验
if err := revertToManifest(db, mf, getIDMap(db.opt.Dir)); err != nil {
return nil, err
}
var mu sync.Mutex
tables := make([][]*table.Table, db.opt.MaxLevels)
var maxFileID uint64
// We found that using 3 goroutines allows disk throughput to be utilized to its max.
// Disk utilization is the main thing we should focus on, while trying to read the data. That's
// the one factor that remains constant between HDD and SSD.
// 一种针对并发控制的负载均衡策略,对于ssd来说,创建3个协程能够最大的发挥ssd的优点
throttle := y.NewThrottle(3)
start := time.Now()
var numOpened int32
// 创建一个定时触发器进行超时控制
tick := time.NewTicker(3 * time.Second)
// 钩子函数关闭定时器
defer tick.Stop()
// manifest清单文件的Tables
// 拿到每个table对应的fid
// 第一次初始化的适合因为Tables为空,会直接跳过
for fileID, tf := range mf.Tables {
fname := table.NewFilename(fileID, db.opt.Dir)
select {
case <-tick.C:
db.opt.Infof("%d tables out of %d opened in %s\n", atomic.LoadInt32(&numOpened),
len(mf.Tables), time.Since(start).Round(time.Millisecond))
default:
}
if err := throttle.Do(); err != nil {
closeAllTables(tables)
return nil, err
}
if fileID > maxFileID {
maxFileID = fileID
}
go func(fname string, tf TableManifest) {
var rerr error
defer func() {
throttle.Done(rerr)
atomic.AddInt32(&numOpened, 1)
}()
dk, err := db.registry.DataKey(tf.KeyID)
if err != nil {
rerr = y.Wrapf(err, "Error while reading datakey")
return
}
topt := buildTableOptions(db)
// Explicitly set Compression and DataKey based on how the table was generated.
topt.Compression = tf.Compression
topt.DataKey = dk
mf, err := z.OpenMmapFile(fname, db.opt.getFileFlags(), 0)
if err != nil {
rerr = y.Wrapf(err, "Opening file: %q", fname)
return
}
t, err := table.OpenTable(mf, topt)
if err != nil {
if strings.HasPrefix(err.Error(), "CHECKSUM_MISMATCH:") {
db.opt.Errorf(err.Error())
db.opt.Errorf("Ignoring table %s", mf.Fd.Name())
// Do not set rerr. We will continue without this table.
} else {
rerr = y.Wrapf(err, "Opening table: %q", fname)
}
return
}
mu.Lock()
tables[tf.Level] = append(tables[tf.Level], t)
mu.Unlock()
}(fname, tf)
}
// 关闭相关的任务协程
if err := throttle.Finish(); err != nil {
closeAllTables(tables)
return nil, err
}
db.opt.Infof("All %d tables opened in %s\n", atomic.LoadInt32(&numOpened),
time.Since(start).Round(time.Millisecond))
// 记录当前fid最大值
s.nextFileID = maxFileID + 1
// 初始化每个level的tables
for i, tbls := range tables {
s.levels[i].initTables(tbls)
}
// Make sure key ranges do not overlap etc.
// 必要的数据校验
if err := s.validate(); err != nil {
_ = s.cleanupLevels()
return nil, y.Wrap(err, "Level validation")
}
// Sync directory (because we have at least removed some files, or previously created the
// manifest file).
// 手动进行同步刷盘
if err := syncDir(db.opt.Dir); err != nil {
_ = s.close()
return nil, err
}
return s, nil
}
创建levelHandler

newLevelHandler(db, level) 函数

func newLevelHandler(db *DB, level int) *levelHandler {
return &levelHandler{
level:    level,
strLevel: fmt.Sprintf("l%d", level),
db:       db,
}
}
初始化tables

initTables(tables) 方法

// initTables replaces s.tables with given tables. This is done during loading.
func (s *levelHandler) initTables(tables []*table.Table) {
// 加锁
s.Lock()
defer s.Unlock()
// 赋值与相关值的初始化
s.tables = tables
s.totalSize = 0
s.totalStaleSize = 0
for _, t := range tables {
s.addSize(t)
}
// 如果是L0层,需要拿每个fid排序
if s.level == 0 {
// Key range will overlap. Just sort by fileID in ascending order
// because newer tables are at the end of level 0.
sort.Slice(s.tables, func(i, j int) bool {
return s.tables[i].ID() < s.tables[j].ID()
})
} else {
// L0层往上,拿每个table文件的MinKey排序
// Sort tables by keys.
sort.Slice(s.tables, func(i, j int) bool {
return y.CompareKeys(s.tables[i].Smallest(), s.tables[j].Smallest()) < 0
})
}
}

初始化vlog

init(db) 方法

// init initializes the value log struct. This initialization needs to happen
// before compactions start.
func (vlog *valueLog) init(db *DB) {
// 加载配置
vlog.opt = db.opt
vlog.db = db
// We don't need to open any vlog files or collect stats for GC if DB is opened
// in InMemory mode. InMemory mode doesn't create any files/directories on disk.
// inmem,那我走?🤡
if vlog.opt.InMemory {
return
}
// 指定的vlog目录
vlog.dirPath = vlog.opt.ValueDir
// GC模块用到的channel
vlog.garbageCh = make(chan struct{}, 1) // Only allow one GC at a time.
// 创建一个GC模块相关文件
lf, err := InitDiscardStats(vlog.opt)
y.Check(err)
vlog.discardStats = lf
}

打开vlog

open(db) 方法

func (vlog *valueLog) open(db *DB) error {
// We don't need to open any vlog files or collect stats for GC if DB is opened
// in InMemory mode. InMemory mode doesn't create any files/directories on disk.
// 不想再做解释了,inmem,那我走!!!
if db.opt.InMemory {
return nil
}
// 填充文件fid到filesMap
if err := vlog.populateFilesMap(); err != nil {
return err
}
// If no files are found, then create a new file.
// 如果没有.vlog文件
if len(vlog.filesMap) == 0 {
if vlog.opt.ReadOnly {
return nil
}
// 创建一个.vlog文件
_, err := vlog.createVlogFile()
return y.Wrapf(err, "Error while creating log file in valueLog.open")
}
fids := vlog.sortedFids()
for _, fid := range fids {
lf, ok := vlog.filesMap[fid]
y.AssertTrue(ok)
// Just open in RDWR mode. This should not create a new log file.
lf.opt = vlog.opt
if err := lf.open(vlog.fpath(fid), os.O_RDWR,
2*vlog.opt.ValueLogFileSize); err != nil {
return y.Wrapf(err, "Open existing file: %q", lf.path)
}
// We shouldn't delete the maxFid file.
if lf.size == vlogHeaderSize && fid != vlog.maxFid {
vlog.opt.Infof("Deleting empty file: %s", lf.path)
if err := lf.Delete(); err != nil {
return y.Wrapf(err, "while trying to delete empty file: %s", lf.path)
}
delete(vlog.filesMap, fid)
}
}
if vlog.opt.ReadOnly {
return nil
}
// Now we can read the latest value log file, and see if it needs truncation. We could
// technically do this over all the value log files, but that would mean slowing down the value
// log open.
last, ok := vlog.filesMap[vlog.maxFid]
y.AssertTrue(ok)
lastOff, err := last.iterate(vlog.opt.ReadOnly, vlogHeaderSize,
func(_ Entry, vp valuePointer) error {
return nil
})
if err != nil {
return y.Wrapf(err, "while iterating over: %s", last.path)
}
if err := last.Truncate(int64(lastOff)); err != nil {
return y.Wrapf(err, "while truncating last value log file: %s", last.path)
}
// Don't write to the old log file. Always create a new one.
if _, err := vlog.createVlogFile(); err != nil {
return y.Wrapf(err, "Error while creating log file in valueLog.open")
}
return nil
}

populateFilesMap() 方法

func (vlog *valueLog) populateFilesMap() error {
vlog.filesMap = make(map[uint32]*logFile)
// 从目录中拿到每个文件的句柄
files, _ := ioutil.ReadDir(vlog.dirPath)
found := make(map[uint64]struct{})
for _, file := range files {
// 判断是否以.vlog作为后缀
if !strings.HasSuffix(file.Name(), ".vlog") {
continue
}
// 对.vlog文件进行校验,去除fid,进行消重判断
fsz := len(file.Name())
fid, err := strconv.ParseUint(file.Name()[:fsz-5], 10, 32)
if err != nil {
return errFile(err, file.Name(), "Unable to parse log id.")
}
if _, ok := found[fid]; ok {
return errFile(err, file.Name(), "Duplicate file found. Please delete one.")
}
found[fid] = struct{}{}
lf := &logFile{
fid:      uint32(fid),
path:     vlog.fpath(uint32(fid)),
registry: vlog.db.registry,
}
// 最后保存到vlog的filesMap当中
vlog.filesMap[uint32(fid)] = lf
if vlog.maxFid < uint32(fid) {
vlog.maxFid = uint32(fid)
}
}
// 直到每个.vlog文件的fid都添加到了map中
// 第一次初始化时没有.vlog文件,故直接跳过
return nil
}
创建vlog文件

createVlogFile() 方法

func (vlog *valueLog) createVlogFile() (*logFile, error) {
// 最大的fid
fid := vlog.maxFid + 1
// 根据fid命名
path := vlog.fpath(fid)
// 创建一个句柄实例
lf := &logFile{
fid:      fid,
path:     path,
registry: vlog.db.registry,
writeAt:  vlogHeaderSize,
opt:      vlog.opt,
}
// 进行系统调用打开文件,通过mmap的方式
// .vlog文件初始化时会创建一个2G的文件
err := lf.open(path, os.O_RDWR|os.O_CREATE|os.O_EXCL, 2*vlog.opt.ValueLogFileSize)
if err != z.NewFile && err != nil {
return nil, err
}
// 进行数据初始化更新的操作
vlog.filesLock.Lock()
vlog.filesMap[fid] = lf
y.AssertTrue(vlog.maxFid < fid)
vlog.maxFid = fid
// writableLogOffset is only written by write func, by read by Read func.
// To avoid a race condition, all reads and updates to this variable must be
// done via atomics.
atomic.StoreUint32(&vlog.writableLogOffset, vlogHeaderSize)
vlog.numEntriesWritten = 0
vlog.filesLock.Unlock()
return lf, nil
}
暂无评论

发送评论 编辑评论


|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇