4

Ethereum以太坊源码分析(四)ethdb数据库

 2 years ago
source link: https://dinghaoli.github.io/2018/11/ETH_code_4/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

Ethereum以太坊源码分析(四)ethdb数据库

calendar.png2018-11-20 | 阅读:次

本文参考:

Github - go-ethereum-code-analysis
有关NoSQL Compaction策略的一点思考


根据之前的代码分析我们可以知道,以太坊的存储数据结构为MPT(Merkle Patricia Tries),一切数据都为键值对(KeyValue)。为了存储(序列化)这些健值对,以太坊采用LevelDB为原型来定制自己的数据库ethdb。

go-ethereum所有的数据存储在levelDB这个Google开源的KeyValue文件数据库中,整个区块链的所有数据都存储在一个levelDB的数据库中,levelDB支持按照文件大小切分文件的功能,所以我们看到的区块链的数据都是一个一个小文件,其实这些小文件都是同一个levelDB实例。这里简单的看下levelDB的go封装代码。

LevelDB官方网站介绍的特点

特点:

  • key和value都是任意长度的字节数组;
  • entry(即一条K-V记录)默认是按照key的字典顺序存储的,当然开发者也可以重载这个排序函数;
  • 提供的基本操作接口:Put()、Delete()、Get()、Batch();
  • 支持批量操作以原子操作进行;
  • 可以创建数据全景的snapshot(快照),并允许在快照中查找数据;
  • 可以通过前向(或后向)迭代器遍历数据(迭代器会隐含的创建一个snapshot);
  • 自动使用Snappy压缩数据;
  • 可移植性;

限制:

  • 非关系型数据模型(NoSQL),不支持sql语句,也不支持索引;
  • 一次只允许一个进程访问一个特定的数据库;
  • 没有内置的C/S架构,但开发者可以使用LevelDB库自己封装一个server;

源码所在的目录在ethereum/ethdb目录。代码比较简单, 分为下面四个文件:

database.go             levelDB的封装代码
memory_database.go      供测试用的基于内存的数据库,不会持久化为文件,仅供测试
interface.go            定义了数据库的接口
database_test.go        测试案例

interface.go

这个文件用于暴露数据库的接口,看下面的代码,基本上定义了KeyValue数据库的基本操作,Put,Get,Has,Delete等基本操作,levelDB是不支持SQL的,基本可以理解为数据结构里面的Map。Database接口中的函数都是线程安全的,不用加锁, 而Batch(批量处理)接口中的函数则不是线程安全的。

// go-ethereum/ethdb/interface.go

package ethdb

// Code using batches should try to add this much data to the batch.
// The value was determined empirically.
const IdealBatchSize = 100 * 1024

// Putter wraps the database write operation supported by both batches and regular databases.
// Putter接口定义了批量操作和普通操作的写入接口
type Putter interface {
    Put(key []byte, value []byte) error
}

// Deleter wraps the database delete operation supported by both batches and regular databases.
type Deleter interface {
    Delete(key []byte) error
}

// Database wraps all database operations. All methods are safe for concurrent use.
// 数据库接口定义了所有的数据库操作, 所有的方法都是多线程安全的。
type Database interface {
    Putter
    Deleter
    Get(key []byte) ([]byte, error)
    Has(key []byte) (bool, error)
    Close()
    NewBatch() Batch
}

// Batch is a write-only database that commits changes to its host database
// when Write is called. Batch cannot be used concurrently.
// 批量操作接口,不能多线程同时使用,当Write方法被调用的时候,数据库会提交写入的更改。
type Batch interface {
    Putter
    Deleter
    ValueSize() int // amount of data in the batch
    Write() error
    // Reset resets the batch for reuse
    Reset()
}

memory_database.go

MemDatabase为,供测试用的基于内存的数据库,不会持久化为文件,不能用于生产环境。我们可以很清楚地看到结构就是一个map,而且只在内存中,操作也不是线程安全的,各种操作都加了Lock。

Batch的操作也很清晰明了,首先,kv的结构中包含{k, v, del},k就是key、v就是value,del是一个Bool值,若为真,就代表要删除这个keyValue对,反之,就是写入。

由memBatch的结构我们可以知道,它维护一个kv数组,里面存储的都是待操作的kv对。*memBatch的write方法用于对一批kv(存在kv数组中)进行操作(写入还是删除由kv结构中的del值来决定),Put用于给kv数组增加一个需要写的kv,Delete用于对kv数组增加一个需要删除的kv。

// go-ethereum/ethdb/memory_database.go

/*
 * This is a test memory database. Do not use for any production it does not get persisted
 */
type MemDatabase struct {
    db   map[string][]byte
    lock sync.RWMutex
}

func NewMemDatabase() *MemDatabase {
    return &MemDatabase{
        db: make(map[string][]byte),
    }
}

...

func (db *MemDatabase) Put(key []byte, value []byte) error {
    // 锁机制
    db.lock.Lock()
    defer db.lock.Unlock()

    db.db[string(key)] = common.CopyBytes(value)
    return nil
}

func (db *MemDatabase) Has(key []byte) (bool, error) {
    // 加读锁
    db.lock.RLock()
    defer db.lock.RUnlock()

    _, ok := db.db[string(key)]
    return ok, nil
}

...


func (db *MemDatabase) NewBatch() Batch {
    return &memBatch{db: db}
}

func (db *MemDatabase) Len() int { return len(db.db) }

type kv struct {
    k, v []byte
    del  bool
}

type memBatch struct {
    db     *MemDatabase
    // 批量处理的待操作列表
    writes []kv
    size   int
}

func (b *memBatch) Put(key, value []byte) error {
    // 把操作加入kv待处理列表中
    b.writes = append(b.writes, kv{common.CopyBytes(key), common.CopyBytes(value), false})
    b.size += len(value)
    return nil
}

func (b *memBatch) Delete(key []byte) error {
    b.writes = append(b.writes, kv{common.CopyBytes(key), nil, true})
    b.size += 1
    return nil
}

func (b *memBatch) Write() error {
    b.db.lock.Lock()
    defer b.db.lock.Unlock()

    for _, kv := range b.writes {
        if kv.del {
            delete(b.db.db, string(kv.k))
            continue
        }
        b.db.db[string(kv.k)] = kv.v
    }
    return nil
}

database.go

这个是实际ethereum客户端使用的代码,封装了levelDB的接口。

// go-ethereum/ethdb/database.go

package ethdb

import (
    "fmt"
    "strconv"
    "strings"
    "sync"
    "time"

    "github.com/ethereum/go-ethereum/log"
    "github.com/ethereum/go-ethereum/metrics"
    "github.com/syndtr/goleveldb/leveldb"
    "github.com/syndtr/goleveldb/leveldb/errors"
    "github.com/syndtr/goleveldb/leveldb/filter"
    "github.com/syndtr/goleveldb/leveldb/iterator"
    "github.com/syndtr/goleveldb/leveldb/opt"
    "github.com/syndtr/goleveldb/leveldb/util"
)

使用了github.com/syndtr/goleveldb/leveldb的leveldb的封装,所以一些使用的文档可以在那里找到。

可以看到,数据结构主要增加了很多的Mertrics用来记录数据库的使用情况,增加了quitChan用来处理停止时候的一些情况,这个后面会分析。如果下面代码可能有疑问的地方应该再Filter: filter.NewBloomFilter(10)这个可以暂时不用关注,这个是levelDB里面用来进行性能优化的一个选项,可以不用理会。Mertrics当中很数用来记录compaction的相关数据。

现在的NoSQL数据库必须要有compaction操作,我们来看看compaction的定义:

In telecommunication, data compaction is the reduction of the number of data elements, bandwidth, cost, and time for the generation, transmission, and storage of data without loss of information by eliminating unnecessary redundancy, removing irrelevancy, or using special coding.

主要的宗旨就是在传输过程中节省开销。那对应到存储领域,就是要节省存储开销,节省读取开销的操作。compaction具体细节可以参考这篇文章有关NoSQL Compaction策略的一点思考

// go-ethereum/ethdb/database.go

type LDBDatabase struct {
    fn string      // filename for reporting
    // 直接使用LevelDB来进行存储
    db *leveldb.DB // LevelDB instance

    // 增加了很多Mertrics用于记录数据库的使用情况
    compTimeMeter    metrics.Meter // Meter for measuring the total time spent in database compaction
    compReadMeter    metrics.Meter // Meter for measuring the data read during compaction
    compWriteMeter   metrics.Meter // Meter for measuring the data written during compaction
    writeDelayNMeter metrics.Meter // Meter for measuring the write delay number due to database compaction
    writeDelayMeter  metrics.Meter // Meter for measuring the write delay duration due to database compaction
    diskReadMeter    metrics.Meter // Meter for measuring the effective amount of data read
    diskWriteMeter   metrics.Meter // Meter for measuring the effective amount of data written

    quitLock sync.Mutex      // Mutex protecting the quit channel access
    quitChan chan chan error // Quit channel to stop the metrics collection before closing the database

    log log.Logger // Contextual logger tracking the database path
}

// NewLDBDatabase returns a LevelDB wrapped object.
func NewLDBDatabase(file string, cache int, handles int) (*LDBDatabase, error) {
    logger := log.New("database", file)

    // Ensure we have some minimal caching and file guarantees
    if cache < 16 {
        cache = 16
    }
    if handles < 16 {
        handles = 16
    }
    logger.Info("Allocated cache and file handles", "cache", cache, "handles", handles)

    // Open the db and recover any potential corruptions
    db, err := leveldb.OpenFile(file, &opt.Options{
        OpenFilesCacheCapacity: handles,
        BlockCacheCapacity:     cache / 2 * opt.MiB,
        WriteBuffer:            cache / 4 * opt.MiB, // Two of these are used internally
        Filter:                 filter.NewBloomFilter(10),
    })
    if _, corrupted := err.(*errors.ErrCorrupted); corrupted {
        db, err = leveldb.RecoverFile(file, nil)
    }
    // (Re)check for errors and abort if opening of the db failed
    if err != nil {
        return nil, err
    }
    return &LDBDatabase{
        fn:  file,
        db:  db,
        log: logger,
    }, nil
}

再看看下面的Put和Has的代码,因为github.com/syndtr/goleveldb/leveldb封装之后的代码是支持多线程同时访问的,所以下面这些代码是不用使用锁来保护的,这个可以注意一下。这里面大部分的代码都是直接调用leveldb的封装,所以不详细介绍了。 有一个比较有意思的地方是Metrics代码。

// go-ethereum/ethdb/database.go

// Path returns the path to the database directory.
func (db *LDBDatabase) Path() string {
    return db.fn
}

// Put puts the given key / value to the queue
func (db *LDBDatabase) Put(key []byte, value []byte) error {
    return db.db.Put(key, value, nil)
}

func (db *LDBDatabase) Has(key []byte) (bool, error) {
    return db.db.Has(key, nil)
}

// Get returns the given key if it's present.
func (db *LDBDatabase) Get(key []byte) ([]byte, error) {
    dat, err := db.db.Get(key, nil)
    if err != nil {
        return nil, err
    }
    return dat, nil
}

Metrics的处理

之前在创建NewLDBDatabase的时候,并没有初始化内部的很多Mertrics,这个时候Mertrics是为nil的。初始化Mertrics是在Meter方法中。外部传入了一个prefix参数,然后创建了各种Mertrics(具体如何创建Merter,会后续在Meter专题进行分析),然后创建了quitChan。 最后启动了一个线程调用了db.meter方法。

// go-ethereum/ethdb/database.go

// Meter configures the database metrics collectors and
func (db *LDBDatabase) Meter(prefix string) {
    // Initialize all the metrics collector at the requested prefix
    // 初始化metrics的各种参数
    db.compTimeMeter = metrics.NewRegisteredMeter(prefix+"compact/time", nil)
    db.compReadMeter = metrics.NewRegisteredMeter(prefix+"compact/input", nil)
    db.compWriteMeter = metrics.NewRegisteredMeter(prefix+"compact/output", nil)
    db.diskReadMeter = metrics.NewRegisteredMeter(prefix+"disk/read", nil)
    db.diskWriteMeter = metrics.NewRegisteredMeter(prefix+"disk/write", nil)
    db.writeDelayMeter = metrics.NewRegisteredMeter(prefix+"compact/writedelay/duration", nil)
    db.writeDelayNMeter = metrics.NewRegisteredMeter(prefix+"compact/writedelay/counter", nil)

    // Create a quit channel for the periodic collector and run it
    db.quitLock.Lock()
    db.quitChan = make(chan chan error)
    db.quitLock.Unlock()

    go db.meter(3 * time.Second)
}

这个方法每3秒钟获取一次leveldb内部的计数器,然后把他们公布到metrics子系统。 这是一个无限循环的方法, 直到quitChan收到了一个退出信号。很多对信息的处理。

主要来说都是先用LevelDB原本的的接口GetProperty()来获取各种stats,之后对其进行处理。

// go-ethereum/ethdb/database.go

// meter periodically retrieves internal leveldb counters and reports them to
// the metrics subsystem.
//
// This is how a stats table look like (currently):
// 下面的注释就是我们调用 db.db.GetProperty("leveldb.stats")返回的字符串,后续的代码需要解析这个字符串并把信息写入到Meter中。
//
//   Compactions
//    Level |   Tables   |    Size(MB)   |    Time(sec)  |    Read(MB)   |   Write(MB)
//   -------+------------+---------------+---------------+---------------+---------------
//      0   |          0 |       0.00000 |       1.27969 |       0.00000 |      12.31098
//      1   |         85 |     109.27913 |      28.09293 |     213.92493 |     214.26294
//      2   |        523 |    1000.37159 |       7.26059 |      66.86342 |      66.77884
//      3   |        570 |    1113.18458 |       0.00000 |       0.00000 |       0.00000
//
// This is how the write delay look like (currently):
// DelayN:5 Delay:406.604657ms Paused: false
//
// This is how the iostats look like (currently):
// Read(MB):3895.04860 Write(MB):3654.64712
func (db *LDBDatabase) meter(refresh time.Duration) {
    // Create the counters to store current and previous compaction values
    compactions := make([][]float64, 2)
    for i := 0; i < 2; i++ {
        compactions[i] = make([]float64, 3)
    }
    // Create storage for iostats.
    var iostats [2]float64

    // Create storage and warning log tracer for write delay.
    var (
        delaystats      [2]int64
        lastWritePaused time.Time
    )

    var (
        errc chan error
        merr error
    )

    // Iterate ad infinitum and collect the stats
    // 无限循环的去记录stats,除非db.quitChan获得关闭信号
    for i := 1; errc == nil && merr == nil; i++ {
        // Retrieve the database stats
        // 调用LevelDB本身的接口来获得新信息
        stats, err := db.db.GetProperty("leveldb.stats")
        if err != nil {
            db.log.Error("Failed to read database stats", "err", err)
            merr = err
            continue
        }
        // Find the compaction table, skip the header
        lines := strings.Split(stats, "\n")
        
        ...

        // 之后都是对获取的信息进行处理

        // Retrieve the write delay statistic
        // 获取写操作的延迟信息
        writedelay, err := db.db.GetProperty("leveldb.writedelay")
        if err != nil {
            db.log.Error("Failed to read database write delay statistic", "err", err)
            merr = err
            continue
        }
        
        ...

        // 之后都是对获取的信息进行处理

        // Retrieve the database iostats.
        // 获取数据库IO的相关数据
        ioStats, err := db.db.GetProperty("leveldb.iostats")
        if err != nil {
            db.log.Error("Failed to read database iostats", "err", err)
            merr = err
            continue
        }
        
        ...

        // 之后都是对获取的信息进行处理

        // Sleep a bit, then repeat the stats collection
        select {
        case errc = <-db.quitChan:
            // Quit requesting, stop hammering the database
            // 收到停止信号,终止收集stats
        case <-time.After(refresh):
            // Timeout, gather a new set of stats
        }
    }

    if errc == nil {
        errc = <-db.quitChan
    }
    errc <- merr
}

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK