SinceNow.net

bitcask

phpangel   2024-08-03 19:09:51

package main

import (

    "encoding/binary"

    "hash/crc32"

    "io"

    "os"

    "sort"

    "sync"

)

/*

Entry:代表DB中一条数据的信息。

DataFiles:磁盘中的文件

Storage:与文件系统打交道的对象,包括了写入,读取数据。将数据写入到 DataFiles 中

Index,索引,记录一条数据的具体信息,主要是数据在磁盘中的位置。

DB,DB的实体。包含了DB的各种操作,包括读取,写入数据。

*/

type DB struct {

    rw      sync.RWMutex       // 读写锁

    kd      *index.KeyDir      // 内存index

    storage *storage.DataFiles // 文件

    opt     *Options           // 参数

}

type Entry struct {

    Key   []byte

    Value []byte

    Meta  *Meta

}

type Meta struct {

    Crc       uint32

    position  uint64

    TimeStamp uint64

    KeySize   uint32

    ValueSize uint32

    Flag      uint8

}

type DataFiles struct {

    dir         string // 数据目录

    oIds        []int  // old data files 的文件列表

    segmentSize int64

    active      *ActiveFile      // active data file 文件

    olds        map[int]*OldFile //  old data files 的文件列表

}

func (e *Entry) Encode() []byte {

    size := e.Size() // int64(MetaSize + e.Meta.KeySize + e.Meta.ValueSize)

    buf := make([]byte, size)

    binary.LittleEndian.PutUint64(buf[4:12], e.Meta.position)   // 这里并没有实际作用

    binary.LittleEndian.PutUint64(buf[12:20], e.Meta.TimeStamp) // 时间戳

    binary.LittleEndian.PutUint32(buf[20:24], e.Meta.KeySize)   // key 大小

    binary.LittleEndian.PutUint32(buf[24:28], e.Meta.ValueSize) // value 大小

    buf[28] = e.Meta.Flag

    if e.Meta.Flag != DeleteFlag {

        copy(buf[MetaSize:MetaSize+len(e.Key)], e.Key)

        copy(buf[MetaSize+len(e.Key):MetaSize+len(e.Key)+len(e.Value)], e.Value)

    }

    c32 := crc32.ChecksumIEEE(buf[4:])

    binary.LittleEndian.PutUint32(buf[0:4], c32) // 写入CRC

    return buf

}

/*

Set: db 暴露出来的接口,写入一个key value 值

*/

func (db *DB) Set(key []byte, value []byte) error {

    db.rw.Lock()

    defer db.rw.Unlock()

    entry := entity.NewEntryWithData(key, value) // Entry

    h, err := db.storage.WriterEntity(entry)     // 写入到文件

    if err != nil {

        return err

    }

    db.kd.AddIndexByData(h, entry) // 更新内存 index

    return nil

}

/*

WriterEntity: 写入 Entity 到文件

*/

func (af *ActiveFile) WriterEntity(e entity.Entity) (h *entity.Hint, err error) {

    buf := e.Encode()                    // 编码

    n, err := af.fd.WriteAt(buf, af.off) // 写入到 文件 offset

    if n < len(buf) {

        return nil, WriteMissDataErr

    }

    if err != nil {

        return nil, err

    }

    h = &entity.Hint{Fid: af.fid, Off: af.off}

    af.off += e.Size() // 更新文件下一个offset

    return h, nil

}

/*AddIndexByData: 更新内存 index*/

// DataPosition means a certain position of an entity.Entry which stores in disk.

type DataPosition struct {

    Fid       int   // 文件 fd

    Off       int64 // 所在文件的偏移量

    Timestamp uint64

    KeySize   int

    ValueSize int

}

func (kd *KeyDir) AddIndexByRawInfo(fid int, off int64, key, value []byte) {

    index := newDataPosition(fid, off, key, value)

    kd.Add(string(key), index) // 记录到内存

}

func newDataPosition(fid int, off int64, key, value []byte) *DataPosition {

    dp := &DataPosition{}

    dp.Fid = fid

    dp.Off = off

    dp.KeySize = len(key)

    dp.ValueSize = len(value)

    return dp

}

/*Get:db 暴露出来的接口,读取一个key的值

 */

// Get gets value by using key

func (db *DB) Get(key []byte) (value []byte, err error) {

    db.rw.RLock()

    defer db.rw.RUnlock()

    i := db.kd.Find(string(key))

    if i == nil {

        return nil, KeyNotFoundErr

    }

    entry, err := db.storage.ReadEntry(i)

    if err != nil {

        return nil, err

    }

    return entry.Value, nil

}

/*

ReadEntry:从文件中读取一个 Entry

*/

func (dfs *DataFiles) ReadEntry(index *index.DataPosition) (e *entity.Entry, err error) {

    dataSize := entity.MetaSize + index.KeySize + index.ValueSize

    if index.Fid == dfs.active.fid {

        return dfs.active.ReadEntity(index.Off, dataSize) // 从acive 文件中读取

    }

    of, exist := dfs.olds[index.Fid] // 从 old data file 中读取

    if !exist {

        return nil, MissOldFileErr

    }

    return of.ReadEntity(index.Off, dataSize)

}

func readEntry(fd *os.File, off int64, length int) (e *entity.Entry, err error) {

    buf := make([]byte, length)

    n, err := fd.ReadAt(buf, off) // 根据 key 在内存中读取问津啊对应的 offset 记录

    if n < length {

        return nil, ReadMissDataErr

    }

    if err != nil {

        return nil, err

    }

    e = entity.NewEntry()

    e.DecodeMeta(buf[:entity.MetaSize])

    e.DecodePayload(buf[entity.MetaSize:]) // 解码,从磁盘数据还原成写入时的数据

    return e, nil

}

/*

Marge 操作

Marge 操作的流程就是*/

// Merge clean the useless data

func (db *DB) Merge() error {

    db.rw.Lock()

    defer db.rw.Unlock()

    fids := db.storage.GetOldFiles()

    if len(fids) < 2 {

        return NoNeedToMergeErr

    }

    sort.Ints(fids)

    for _, fid := range fids[:len(fids)-1] {

        var off int64 = 0

        reader := db.storage.GetOldFile(fid) // 获取到old data file

        for {

            entry, err := reader.ReadEntityWithOutLength(off) // 读取一条数据

            if err == nil {

                key := string(entry.Key)

                off += entry.Size()

                oldIndex := db.kd.Find(key) // 看下当前内存 index中有没有数据,这条数据可能被删除了

                if oldIndex == nil {

                    continue

                }

                // oldIndex 是这个 key 最新的文件

                if oldIndex.IsEqualPos(fid, off) { // 比较一些当前读到的这个数据是不是最新的数据,如果是,则需要写入到 active data file 中

                    h, err := db.storage.WriterEntity(entry)

                    if err != nil {

                        return err

                    }

                    db.kd.AddIndexByData(h, entry) // 更新内存index

                }

            } else {

                if err == io.EOF {

                    break

                }

                return err

            }

        }

        err := db.storage.RemoveFile(fid) // 删除老的文件

        if err != nil {

            return err

        }

    }

    return nil

}

Related Articles

0 Comments

message
沪ICP备2024072411号 © 2022 SinceNow.net - GitHub
Login