作者:jaydenwen,腾讯PCG后台开发工程师
本篇(下)介绍3.3-6内容,前文请移步(上)篇:万字长文:自底向上剖析boltdb数据库源码(上)
3.3 node节点的相关操作
在开始分析node节点之前,我们先看一下官方对node节点的描述
node represents an in-memory, deserialized page
一个node节点,既可能是叶子节点,也可能是根节点,也可能是分支节点。是物理磁盘上读取进来的页page的内存表现形式。
3.3.1 node节点的定义
// node represents an in-memory, deserialized page.
type node struct {
bucket *Bucket // 关联一个桶
isLeaf bool
unbalanced bool // 值为true的话,需要考虑页合并
spilled bool // 值为true的话,需要考虑页分裂
key []byte // 对于分支节点的话,保留的是最小的key
pgid pgid // 分支节点关联的页id
parent *node // 该节点的parent
children nodes // 该节点的孩子节点
inodes inodes // 该节点上保存的索引数据
}
// inode represents an internal node inside of a node.
// It can be used to point to elements in a page or point
// to an element which hasn't been added to a page yet.
type inode struct {
// 表示是否是子桶叶子节点还是普通叶子节点。如果flags值为1表示子桶叶子节点,否则为普通叶子节点
flags uint32
// 当inode为分支元素时,pgid才有值,为叶子元素时,则没值
pgid pgid
key []byte
// 当inode为分支元素时,value为空,为叶子元素时,才有值
value []byte
}
type inodes []inode
3.3.2 node节点和page转换
在node对象上有两个方法,read(page)、write(page),其中read(page)方法是用来通过page构建一个node节点;而write(page)方法则是将当前的node节点写入到page中,我们在前面他提到了node节点和page节点的相互转换,大家可以回到2.4节内容进行回顾。此处不再重复。
3.3.3 node节点的增删改查
put(k,v)
// put inserts a key/value.
// 如果put的是一个key、value的话,不需要指定pgid。
// 如果put的一个树枝节点,则需要指定pgid,不需要指定value
func (n *node) put(oldKey, newKey, value []byte, pgid pgid, flags uint32) {
if pgid >= n.bucket.tx.meta.pgid {
panic(fmt.Sprintf("pgid (%d) above high water mark (%d)", pgid, n.bucket.tx.meta.pgid))
} else if len(oldKey) <= 0 {
panic("put: zero-length old key")
} else if len(newKey) <= 0 {
panic("put: zero-length new key")
}
// Find insertion index.
index := sort.Search(len(n.inodes), func(i int) bool { return bytespare(n.inodes[i].key, oldKey) != -1 })
// Add capacity and shift nodes if we don't have an exact match and need to insert.
exact := (len(n.inodes) > 0 && index < len(n.inodes) && bytes.Equal(n.inodes[index].key, oldKey))
if !exact {
n.inodes = append(n.inodes, inode{})
copy(n.inodes[index+1:], n.inodes[index:])
}
inode := &n.inodes[index]
inode.flags = flags
inode.key = newKey
inode.value = value
inode.pgid = pgid
_assert(len(inode.key) > 0, "put: zero-length inode key")
}
get(k)
在node中,没有get(k)的方法,其本质是在Cursor中就返回了get的数据。大家可以看看Cursor中的keyValue()方法。
del(k)
// del removes a key from the node.
func (n *node) del(key []byte) {
// Find index of key.
index := sort.Search(len(n.inodes), func(i int) bool { return bytespare(n.inodes[i].key, key) != -1 })
// Exit if the key isn't found.
if index >= len(n.inodes) || !bytes.Equal(n.inodes[index].key, key) {
return
}
// Delete inode from the node.
n.inodes = append(n.inodes[:index], n.inodes[index+1:]...)
// Mark the node as needing rebalancing.
n.unbalanced = true
}
3.3.4 node节点的分裂和合并
上面我们看了对node节点的操作,包括put和del方法。经过这些操作后,可能会导致当前的page填充度过高或者过低。因此就引出了node节点的分裂和合并。下面简单介绍下什么是分裂和合并。
分裂: 当一个node中的数据过多时,最简单就是当超过了page的填充度时,就需要将当前的node拆分成两个,也就是底层会将一页数据拆分存放到两页中。具体实现在spill()方法中。
spill writes the nodes to dirty pages and splits nodes as it goes. Returns an error if dirty pages cannot be allocated.
合并: 当删除了一个或者一批对象时,此时可能会导致一页数据的填充度过低,此时空间可能会浪费比较多。所以就需要考虑对页之间进行数据合并。具体实现在rebalance()方法中。
rebalance attempts to combine the node with sibling nodes if the node fill size is below a threshold or if there are not enough keys.
由于内容过长,此处代码就不贴出来了。关于该部分的代码分析大家感兴趣可以点击此处 或者点击此处 进行阅读。
3.4 Bucket的相关操作
前面我们分析完了如何遍历、查找一个Bucket之后,下面我们来看看如何创建、获取、删除一个Bucket对象。
3.4.1 创建一个Bucket
1. CreateBucketIfNotExists()、CreateBucket()分析
根据指定的key来创建一个Bucket,如果指定key的Bucket已经存在,则会报错。如果指定的key之前有插入过元素,也会报错。否则的话,会在当前的Bucket中找到合适的位置,然后新建一个Bucket插入进去,最后返回给客户端。
// CreateBucketIfNotExists creates a new bucket if it doesn't already exist and returns a reference to it.
// Returns an error if the bucket name is blank, or if the bucket name is too long.
// The bucket instance is only valid for the lifetime of the transaction.
func (b Bucket) CreateBucketIfNotExists(key []byte) (Bucket, error) {
child, err := b.CreateBucket(key)
if err == ErrBucketExists {
return b.Bucket(key), nil
} else if err != nil {
return nil, err
}
return child, nil
}
// CreateBucket creates a new bucket at the given key and returns the new bucket.
// Returns an error if the key already exists, if the bucket name is blank, or if the bucket name is too long.
// The bucket instance is only valid for the lifetime of the transaction.
func (b Bucket) CreateBucket(key []byte) (Bucket, error) {
// ...
// 省去异常检查逻辑
// Move cursor to correct position.
// 拿到游标
c := b.Cursor()
// 开始遍历、找到合适的位置
k, _, flags := c.seek(key)
// Return an error if there is an existing key.
if bytes.Equal(key, k) {
// 是桶,已经存在了
if (flags & bucketLeafFlag) != 0 {
return nil, ErrBucketExists
}
// 不是桶、但key已经存在了
return nil, ErrIncompatibleValue
}
// Create empty, inline bucket.
var bucket = Bucket{
bucket: &bucket{},
rootNode: &node{isLeaf: true},
FillPercent: DefaultFillPercent,
}
// 拿到bucket对应的value
var value = bucket.write()
// Insert into node.
key = cloneBytes(key)
// 插入到inode中
// c.node()方法会在内存中建立这棵树,调用n.read(page)
c.node().put(key, key, value, 0, bucketLeafFlag)
// Since subbuckets are not allowed on inline buckets, we need to
// dereference the inline page, if it exists. This will cause the bucket
// to be treated as a regular, non-inline bucket for the rest of the tx.
b.page = nil
//根据key获取一个桶
return b.Bucket(key), nil
}
// write allocates and writes a bucket to a byte slice.
// 内联桶的话,其value中bucketHeaderSize后面的内容为其page的数据
func (b *Bucket) write() []byte {
// Allocate the appropriate size.
var n = b.rootNode
var value = make([]byte, bucketHeaderSize+n.size())
// Write a bucket header.
var bucket = (*bucket)(unsafe.Pointer(&value[0]))
bucket = b.bucket
// Convert byte slice to a fake page and write the root node.
var p = (*page)(unsafe.Pointer(&value[bucketHeaderSize]))
// 将该桶中的元素压缩存储,放在value中
n.write(p)
return value
}
// node returns the node that the cursor is currently positioned on.
func (c Cursor) node() node {
_assert(len(c.stack) > 0, "accessing a node with a zero-length cursor stack")
// If the top of the stack is a leaf node then just return it.
if ref := &c.stack[len(c.stack)-1]; ref.node != nil && ref.isLeaf() {
return ref.node
}
// Start from root and traverse down the hierarchy.
var n = c.stack[0].node
if n == nil {
n = c.bucket.node(c.stack[0].page.id, nil)
}
// 非叶子节点
for _, ref := range c.stack[:len(c.stack)-1] {
_assert(!n.isLeaf, "expected branch node")
n = n.childAt(int(ref.index))
}
_assert(n.isLeaf, "expected leaf node")
return n
}
// put inserts a key/value.
// 如果put的是一个key、value的话,不需要指定pgid。
// 如果put的一个树枝节点,则需要指定pgid,不需要指定value
func (n *node) put(oldKey, newKey, value []byte, pgid pgid, flags uint32) {
put方法的具体实现3.3节中的put(k,v)。
}
3.4.2 获取一个Bucket
根据指定的key来获取一个Bucket。如果找不到则返回nil。
// Bucket retrieves a nested bucket by name.
// Returns nil if the bucket does not exist.
// The bucket instance is only valid for the lifetime of the transaction.
func (b Bucket) Bucket(name []byte) Bucket {
if b.buckets != nil {
if child := b.buckets[string(name)]; child != nil {
return child
}
}
// Move cursor to key.
// 根据游标找key
c := b.Cursor()
k, v, flags := c.seek(name)
// Return nil if the key doesn't exist or it is not a bucket.
if !bytes.Equal(name, k) || (flags&bucketLeafFlag) == 0 {
return nil
}
// Otherwise create a bucket and cache it.
// 根据找到的value来打开桶。
var child = b.openBucket(v)
// 加速缓存的作用
if b.buckets != nil {
b.buckets[string(name)] = child
}
return child
}
// Helper method that re-interprets a sub-bucket value
// from a parent into a Bucket
func (b Bucket) openBucket(value []byte) Bucket {
var child = newBucket(b.tx)
// If unaligned load/stores are broken on this arch and value is
// unaligned simply clone to an aligned byte array.
unaligned := brokenUnaligned && uintptr(unsafe.Pointer(&value[0]))&3 != 0
if unaligned {
value = cloneBytes(value)
}
// If this is a writable transaction then we need to copy the bucket entry.
// Read-only transactions can point directly at the mmap entry.
if b.tx.writable && !unaligned {
child.bucket = &bucket{}
child.bucket = (*bucket)(unsafe.Pointer(&value[0]))
} else {
child.bucket = (*bucket)(unsafe.Pointer(&value[0]))
}
// Save a reference to the inline page if the bucket is inline.
// 内联桶
if child.root == 0 {
child.page = (*page)(unsafe.Pointer(&value[bucketHeaderSize]))
}
return &child
}
3.4.3 删除一个Bucket
DeleteBucket()方法用来删除一个指定key的Bucket。其内部实现逻辑是先递归的删除其子桶。然后再释放该Bucket的page,并最终从叶子节点中移除
// DeleteBucket deletes a bucket at the given key.
// Returns an error if the bucket does not exists, or if the key represents a non-bucket value.
func (b *Bucket) DeleteBucket(key []byte) error {
//异常逻辑检查
// Move cursor to correct position.
c := b.Cursor()
k, _, flags := c.seek(key)
// Return an error if bucket doesn't exist or is not a bucket.
if !bytes.Equal(key, k) {
return ErrBucketNotFound
} else if (flags & bucketLeafFlag) == 0 {
return ErrIncompatibleValue
}
// Recursively delete all child buckets.
child := b.Bucket(key)
// 将该桶下面的所有桶都删除
err := child.ForEach(func(k, v []byte) error {
if v == nil {
if err := child.DeleteBucket(k); err != nil {
return fmt.Errorf("delete bucket: %s", err)
}
}
return nil
})
if err != nil {
return err
}
// Remove cached copy.
delete(b.buckets, string(key))
// Release all bucket pages to freelist.
child.nodes = nil
child.rootNode = nil
child.free()
// Delete the node if we have a matching key.
c.node().del(key)
return nil
}
// del removes a key from the node.
func (n *node) del(key []byte) {
node的del()方法具体实现参考之前3.3节del(k)
}
3.5 key&value的插入、获取、删除
上面一节我们介绍了一下如何创建一个Bucket、如何获取一个Bucket。有了Bucket,我们就可以对我们最关心的key/value键值对进行增删改查了。其实本质上,对key/value的所有操作最终都要表现在底层的node上。因为node节点就是用来存储真实数据的。
3.5.1 插入一个key&value
// Put sets the value for a key in the bucket.
// If the key exist then its previous value will be overwritten.
// Supplied value must remain valid for the life of the transaction.
// Returns an error if the bucket was created from a read-only transaction,
// if the key is blank, if the key is too large, or if the value is too large.
func (b *Bucket) Put(key []byte, value []byte) error {
if b.tx.db == nil {
return ErrTxClosed
} else if !b.Writable() {
return ErrTxNotWritable
} else if len(key) == 0 {
return ErrKeyRequired
} else if len(key) > MaxKeySize {
return ErrKeyTooLarge
} else if int64(len(value)) > MaxValueSize {
return ErrValueTooLarge
}
// Move cursor to correct position.
c := b.Cursor()
k, _, flags := c.seek(key)
// Return an error if there is an existing key with a bucket value.
if bytes.Equal(key, k) && (flags&bucketLeafFlag) != 0 {
return ErrIncompatibleValue
}
// Insert into node.
key = cloneBytes(key)
c.node().put(key, key, value, 0, 0)
return nil
}
3.5.2 获取一个key&value
// Get retrieves the value for a key in the bucket.
// Returns a nil value if the key does not exist or if the key is a nested bucket.
// The returned value is only valid for the life of the transaction.
func (b *Bucket) Get(key []byte) []byte {
k, v, flags := b.Cursor().seek(key)
// Return nil if this is a bucket.
if (flags & bucketLeafFlag) != 0 {
return nil
}
// If our target node isn't the same key as what's passed in then return nil.
if !bytes.Equal(key, k) {
return nil
}
return v
}
3.5.3 删除一个key&value
// Delete removes a key from the bucket.
// If the key does not exist then nothing is done and a nil error is returned.
// Returns an error if the bucket was created from a read-only transaction.
func (b *Bucket) Delete(key []byte) error {
if b.tx.db == nil {
return ErrTxClosed
} else if !b.Writable() {
return ErrTxNotWritable
}
// Move cursor to correct position.
c := b.Cursor()
_, _, flags := c.seek(key)
// Return an error if there is already existing bucket value.
if (flags & bucketLeafFlag) != 0 {
return ErrIncompatibleValue
}
// Delete the node if we have a matching key.
c.node().del(key)
return nil
}
3.5.4 遍历Bucket中所有的key&value
// ForEach executes a function for each key/value pair in a bucket.
// If the provided function returns an error then the iteration is stopped and
// the error is returned to the caller. The provided function must not modify
// the bucket; this will result in undefined behavior.
func (b *Bucket) ForEach(fn func(k, v []byte) error) error {
if b.tx.db == nil {
return ErrTxClosed
}
c := b.Cursor()
// 遍历键值对
for k, v := c.First(); k != nil; k, v = c.Next() {
if err := fn(k, v); err != nil {
return err
}
}
return nil
}
3.6 Bucket的页分裂、页合并
关于本部分的内容过长,大家可以点击此处或者点击此处 进行阅读。
3.7 总结
本章我们主要介绍了boltdb中比较核心的两个数据结构:Bucket、node。为什么这两个数据结构放在一起介绍呢?答案是在boltdb中一个Bucket就对应一颗b+树。而b+树的结构(根节点、叶子节点、非叶子节点)、组织都是通过node来完成的。这也是为什么把他们放在一起介绍的主要原因。
在介绍中,我们主要围绕Bucket的创建、获取、删除、遍历、增删改kv等操作进行展开。其次在遍历时,就引入了Cursor数据结构,一个Bucket对象(一颗b+树)的遍历在boltdb中时通过一个栈来维护遍历的路径来完成的。这也是Cursor中stack的意义。
其次Bucket中对kv的操作都反应到底层的node上,因此我们又同时介绍了node的相关方法,例如put、get、del、spill、rebalance。
最后到此为止,我们的数据时如何存储的、组织的。以及内存和磁盘数据时如何转换映射的,我们就清楚了。下章将介绍boltdb中的事务了。有了事务我们的数据库才称得上是一个完备的数据库。
4. boltdb事务控制
事务可以说是一个数据库必不可少的特性,对boltdb而言也不例外。我们都知道提到事务,必然会想到事务的四大特性。那么下面就让我们看看在boltdb中到底是怎么实现它的事务的呢?
4.1 boltdb事务简介
我们先看一下,boltdb官方文档中对事务的描述:
Bolt allows only one read-write transaction at a time but allows as many read-only transactions as you want at a time. Each transaction has a consistent view of the data as it existed when the transaction started.
Individual transactions and all objects created from them (e.g. buckets, keys) are not thread safe. To work with data in multiple goroutines you must start a transaction for each one or use locking to ensure only one goroutine accesses a transaction at a time. Creating transaction from the DB is thread safe.
Read-only transactions and read-write transactions should not depend on one another and generally shouldn't be opened simultaneously in the same goroutine. This can cause a deadlock as the read-write transaction needs to periodically re-map the data file but it cannot do so while a read-only transaction is open.
我们再简单总结下,在boltdb中支持两类事务:读写事务、只读事务。同一时间有且只能有一个读写事务执行;但同一个时间可以允许有多个只读事务执行。每个事务都拥有自己的一套一致性视图。
此处需要注意的是,在boltdb中打开一个数据库时,有两个选项:只读模式、读写模式。内部在实现时是根据不同的选项来底层加不同的锁(flock)。只读模式对应共享锁,读写模式对应互斥锁。具体加解锁的实现可以在bolt_unix.go 和bolt_windows.go中找到。
关于事务的ACID特性此处就不特别说明了。此部分内容大家可以自行查阅资料,下面我们进入主题。
4.2 boltdb事务Tx定义
// txid represents the internal transaction identifier.
type txid uint64
// Tx represents a read-only or read/write transaction on the database.
// Read-only transactions can be used for retrieving values for keys and creating cursors.
// Read/write transactions can create and remove buckets and create and remove keys.
// IMPORTANT: You must commit or rollback transactions when you are done with
// them. Pages can not be reclaimed by the writer until no more transactions
// are using them. A long running read transaction can cause the database to
// quickly grow.
// Tx 主要封装了读事务和写事务。其中通过writable来区分是读事务还是写事务
type Tx struct {
writable bool
managed bool
db *DB
meta *meta
root Bucket
pages map[pgid]*page
stats TxStats
// 提交时执行的动作
commitHandlers []func()
// WriteFlag specifies the flag for write-related methods like WriteTo().
// Tx opens the database file with the specified flag to copy the data.
// By default, the flag is unset, which works well for mostly in-memory
// workloads. For databases that are much larger than available RAM,
// set the flag to syscall.O_DIRECT to avoid trashing the page cache.
WriteFlag int
}
// init initializes the transaction.
func (tx Tx) init(db DB) {
tx.db = db
tx.pages = nil
// Copy the meta page since it can be changed by the writer.
// 拷贝元信息
tx.meta = &meta{}
db.meta().copy(tx.meta)
// Copy over the root bucket.
// 拷贝根节点
tx.root = newBucket(tx)
tx.root.bucket = &bucket{}
// meta.root=bucket{root:3}
*tx.root.bucket = tx.meta.root
// Increment the transaction id and add a page cache for writable transactions.
if tx.writable {
tx.pages = make(map[pgid]*page)
tx.meta.txid += txid(1)
}
}
4.3 Begin()实现
此处需要说明一下:在boltdb中,事务的开启方法是绑定在DB对象上的,为了保证内容的完整性,我们还是把事务开启的Begin()方法补充到这个地方。 前面提到boltdb中事务分为两类,它的区分就是在开启事务时,根据传递的参数来内部执行不同的逻辑。 在读写事务中,开始事务时加锁,也就是db.rwlock.Lock()。在事务提交或者回滚时才释放锁:db.rwlock.UnLock()。同时也印证了我们前面说的,同一时刻只能有一个读写事务在执行。
// Begin starts a new transaction.
// Multiple read-only transactions can be used concurrently but only one
// write transaction can be used at a time. Starting multiple write transactions
// will cause the calls to block and be serialized until the current write
// transaction finishes.
//
// Transactions should not be dependent on one another. Opening a read
// transaction and a write transaction in the same goroutine can cause the
// writer to deadlock because the database periodically needs to re-mmap itself
// as it grows and it cannot do that while a read transaction is open.
//
// If a long running read transaction (for example, a snapshot transaction) is
// needed, you might want to set DB.InitialMmapSize to a large enough value
// to avoid potential blocking of write transaction.
//
// IMPORTANT: You must close read-only transactions after you are finished or
// else the database will not reclaim old pages.
func (db DB) Begin(writable bool) (Tx, error) {
if writable {
return db.beginRWTx()
}
return db.beginTx()
}
func (db DB) beginTx() (Tx, error) {
// Lock the meta pages while we initialize the transaction. We obtain
// the meta lock before the mmap lock because that's the order that the
// write transaction will obtain them.
db.metalock.Lock()
// Obtain a read-only lock on the mmap. When the mmap is remapped it will
// obtain a write lock so all transactions must finish before it can be
// remapped.
db.mmaplock.RLock()
// Exit if the database is not open yet.
if !db.opened {
db.mmaplocklock()
db.metalock.Unlock()
return nil, ErrDatabaseNotOpen
}
// Create a transaction associated with the database.
t := &Tx{}
t.init(db)
// Keep track of transaction until it closes.
db.txs = append(db.txs, t)
n := len(db.txs)
// Unlock the meta pages.
db.metalock.Unlock()
// Update the transaction stats.
db.statlock.Lock()
db.stats.TxN++
db.stats.OpenTxN = n
db.statlock.Unlock()
return t, nil
}
func (db DB) beginRWTx() (Tx, error) {
// If the database was opened with Options.ReadOnly, return an error.
if db.readOnly {
return nil, ErrDatabaseReadOnly
}
// Obtain writer lock. This is released by the transaction when it closes.
// This enforces only one writer transaction at a time.
db.rwlock.Lock()
// Once we have the writer lock then we can lock the meta pages so that
// we can set up the transaction.
db.metalock.Lock()
defer db.metalock.Unlock()
// Exit if the database is not open yet.
if !db.opened {
db.rwlock.Unlock()
return nil, ErrDatabaseNotOpen
}
// Create a transaction associated with the database.
t := &Tx{writable: true}
t.init(db)
db.rwtx = t
// Free any pages associated with closed read-only transactions.
var minid txid = 0xFFFFFFFFFFFFFFFF
// 找到最小的事务id
for _, t := range db.txs {
if t.meta.txid < minid {
minid = t.meta.txid
}
}
if minid > 0 {
// 将之前事务关联的page全部释放了,因为在只读事务中,没法释放,只读事务的页,因为可能当前的事务已经完成 ,但实际上其他的读事务还在用
db.freelist.release(minid - 1)
}
return t, nil
}
4.4 Commit()实现
Commit()方法内部实现中,总体思路是:
先判定节点要不要合并、分裂对空闲列表的判断,是否存在溢出的情况,溢出的话,需要重新分配空间将事务中涉及改动的页进行排序(保证尽可能的顺序IO),排序后循环写入到磁盘中,最后再执行刷盘当数据写入成功后,再将元信息页写到磁盘中,刷盘以保证持久化上述操作中,但凡有失败,当前事务都会进行回滚
// Commit writes all changes to disk and updates the meta page.
// Returns an error if a disk write error occurs, or if Commit is
// called on a read-only transaction.
// 先更新数据然后再更新元信息
// 更新数据成功、元信息未来得及更新机器就挂掉了。数据如何恢复?
func (tx *Tx) Commit() error {
// 此处省去异常逻辑检查..
// 删除时,进行平衡,页合并
// Rebalance nodes which have had deletions.
var startTime = time.Now()
tx.root.rebalance()
if tx.stats.Rebalance > 0 {
tx.stats.RebalanceTime += time.Since(startTime)
}
// 页分裂
// spill data onto dirty pages.
startTime = time.Now()
// 这个内部会往缓存tx.pages中加page
if err := tx.root.spill(); err != nil {
tx.rollback()
return err
}
tx.stats.SpillTime += time.Since(startTime)
// Free the old root bucket.
tx.meta.root.root = tx.root.root
opgid := tx.meta.pgid
// Free the freelist and allocate new pages for it. This will overestimate
// the size of the freelist but not underestimate the size (which would be bad).
// 分配新的页面给freelist,然后将freelist写入新的页面
tx.db.freelist.free(tx.meta.txid, tx.db.page(tx.meta.freelist))
// 空闲列表可能会增加,因此需要重新分配页用来存储空闲列表
// 因为在开启写事务的时候,有去释放之前读事务占用的页信息,因此此处需要判断是否freelist会有溢出的问题
p, err := tx.allocate((tx.db.freelist.size() / tx.db.pageSize) + 1)
if err != nil {
tx.rollback()
return err
}
// 将freelist写入到连续的新页中
if err := tx.db.freelist.write(p); err != nil {
tx.rollback()
return err
}
// 更新元数据的页id
tx.meta.freelist = p.id
// If the high water mark has moved up then attempt to grow the database.
// 在allocate中有可能会更改meta.pgid
if tx.meta.pgid > opgid {
if err := tx.db.grow(int(tx.meta.pgid+1) * tx.db.pageSize); err != nil {
tx.rollback()
return err
}
}
// Write dirty pages to disk.
startTime = time.Now()
// 写数据
if err := tx.write(); err != nil {
tx.rollback()
return err
}
// If strict mode is enabled then perform a consistency check.
// Only the first consistency error is reported in the panic.
if tx.db.StrictMode {
//做一致性检查逻辑此处省略..
}
// Write meta to disk.
// 元信息写入到磁盘
if err := tx.writeMeta(); err != nil {
tx.rollback()
return err
}
tx.stats.WriteTime += time.Since(startTime)
// Finalize the transaction.
tx.close()
// Execute commit handlers now that the locks have been removed.
for _, fn := range txmitHandlers {
fn()
}
return nil
}
// write writes any dirty pages to disk.
func (tx *Tx) write() error {
// Sort pages by id.
// 保证写的页是有序的
pages := make(pages, 0, len(tx.pages))
for _, p := range tx.pages {
pages = append(pages, p)
}
// Clear out page cache early.
tx.pages = make(map[pgid]*page)
sort.Sort(pages)
// Write pages to disk in order.
for _, p := range pages {
// 页数和偏移量
size := (int(p.overflow) + 1) * tx.db.pageSize
offset := int64(p.id) * int64(tx.db.pageSize)
// Write out page in "max allocation" sized chunks.
ptr := (*[maxAllocSize]byte)(unsafe.Pointer(p))
// 循环写某一页
for {
// Limit our write to our max allocation size.
sz := size
// 2^31=2G
if sz > maxAllocSize-1 {
sz = maxAllocSize - 1
}
// Write chunk to disk.
buf := ptr[:sz]
if _, err := tx.db.ops.writeAt(buf, offset); err != nil {
return err
}
// Update statistics.
tx.stats.Write++
// Exit inner for loop if we've written all the chunks.
size -= sz
if size == 0 {
break
}
// Otherwise move offset forward and move pointer to next chunk.
// 移动偏移量
offset += int64(sz)
// 同时指针也移动
ptr = (*[maxAllocSize]byte)(unsafe.Pointer(&ptr[sz]))
}
}
// Ignore file sync if flag is set on DB.
if !tx.db.NoSync || IgnoreNoSync {
if err := fdatasync(tx.db); err != nil {
return err
}
}
// Put small pages back to page pool.
for _, p := range pages {
// Ignore page sizes over 1 page.
// These are allocated using make() instead of the page pool.
if int(p.overflow) != 0 {
continue
}
buf := (*[maxAllocSize]byte)(unsafe.Pointer(p))[:tx.db.pageSize]
// See go.googlesource/go/+/f03c9202c43e0abb130669852082117ca50aa9b1
// 清空buf,然后放入pagePool中
for i := range buf {
buf[i] = 0
}
tx.db.pagePool.Put(buf)
}
return nil
}
// writeMeta writes the meta to the disk.
func (tx *Tx) writeMeta() error {
// Create a temporary buffer for the meta page.
buf := make([]byte, tx.db.pageSize)
p := tx.db.pageInBuffer(buf, 0)
// 将事务的元信息写入到页中
tx.meta.write(p)
// Write the meta page to file.
if _, err := tx.db.ops.writeAt(buf, int64(p.id)*int64(tx.db.pageSize)); err != nil {
return err
}
if !tx.db.NoSync || IgnoreNoSync {
if err := fdatasync(tx.db); err != nil {
return err
}
}
// Update statistics.
tx.stats.Write++
return nil
}
// allocate returns a contiguous block of memory starting at a given page.
// 分配一段连续的页
func (tx Tx) allocate(count int) (page, error) {
p, err := tx.db.allocate(count)
if err != nil {
return nil, err
}
// Save to our page cache.
tx.pages[p.id] = p
// Update statistics.
tx.stats.PageCount++
tx.stats.PageAlloc += count * tx.db.pageSize
return p, nil
}
4.5 Rollback()实现
Rollback()中,主要对不同事务进行不同操作:
如果当前事务是只读事务,则只需要从db中的txs中找到当前事务,然后移除掉即可。如果当前事务是读写事务,则需要将空闲列表中和该事务关联的页释放掉,同时重新从freelist中加载空闲页。
// Rollback closes the transaction and ignores all previous updates. Read-only
// transactions must be rolled back and not committed.
func (tx *Tx) Rollback() error {
_assert(!tx.managed, "managed tx rollback not allowed")
if tx.db == nil {
return ErrTxClosed
}
tx.rollback()
return nil
}
func (tx *Tx) rollback() {
if tx.db == nil {
return
}
if tx.writable {
// 移除该事务关联的pages
tx.db.freelist.rollback(tx.meta.txid)
// 重新从freelist页中读取构建空闲列表
tx.db.freelist.reload(tx.db.page(tx.db.meta().freelist))
}
tx.close()
}
func (tx *Tx) close() {
if tx.db == nil {
return
}
if tx.writable {
// Grab freelist stats.
var freelistFreeN = tx.db.freelist.free_count()
var freelistPendingN = tx.db.freelist.pending_count()
var freelistAlloc = tx.db.freelist.size()
// Remove transaction ref & writer lock.
tx.db.rwtx = nil
tx.db.rwlock.Unlock()
// Merge statistics.
tx.db.statlock.Lock()
tx.db.stats.FreePageN = freelistFreeN
tx.db.stats.PendingPageN = freelistPendingN
tx.db.stats.FreeAlloc = (freelistFreeN + freelistPendingN) * tx.db.pageSize
tx.db.stats.FreelistInuse = freelistAlloc
tx.db.stats.TxStats.add(&tx.stats)
tx.db.statlock.Unlock()
} else {
// 只读事务
tx.db.removeTx(tx)
}
// Clear all references.
tx.db = nil
tx.meta = nil
tx.root = Bucket{tx: tx}
tx.pages = nil
}
// removeTx removes a transaction from the database.
func (db DB) removeTx(tx Tx) {
// Release the read lock on the mmap.
db.mmaplocklock()
// Use the meta lock to restrict access to the DB object.
db.metalock.Lock()
// Remove the transaction.
for i, t := range db.txs {
if t == tx {
last := len(db.txs) - 1
db.txs[i] = db.txs[last]
db.txs[last] = nil
db.txs = db.txs[:last]
break
}
}
n := len(db.txs)
// Unlock the meta pages.
db.metalock.Unlock()
// Merge statistics.
db.statlock.Lock()
db.stats.OpenTxN = n
db.stats.TxStats.add(&tx.stats)
db.statlock.Unlock()
}
4.6 总结
本章主要详细分析了下,boltdb内部事务的实现机制,再此基础上对事务中核心的几个方法做了代码的分析。到此基本上一个数据库核心的部件都已经实现完毕。那剩下的功能就把各部分功能进行组装起来,实现一个完整对外可用的数据库了。下一章我们来详细分析下boltdb中DB对象的内部一些实现。
5. boltdb的DB对象分析
前面我们介绍了boltdb底层在磁盘上数据时如何组织存储(page)的,然后又介绍了磁盘中的数据在内存中又是如何存储(node)的。接着我们又介绍了管理kv数据集合的Bucket对象以及用来遍历Bucket的Cursor对象。最后我们详细的介绍了boltdb中事务是如何实现(Tx)的。到此二手手机号码出售boltdb中各个零散的部件我们都一一熟悉了,接下来是时候将他们组织在一起工作了。因而就有了boltdb中最上层的DB对象。本章主要介绍DB对象相关的方法以及其内部实现。
5.1 DB结构
DB在boltdb是一个结构体,里面封装了很多属性,部分属性添加了中文注释,其他部分属性,大家可以直接看英文注释,感觉英文表述的很通俗易懂。
//省略部分常量定义
// Default values if not set in a DB instance.
const (
DefaultMaxBatchSize int =s 1000
DefaultMaxBatchDelay = 10 * time.Millisecond
// 16k
DefaultAllocSize = 16 1024 1024
)
// default page size for db is set to the OS page size.
var defaultPageSize = os.Getpagesize()
// DB represents a collection of buckets persisted to a file on disk.
// All data access is performed through transactions which can be obtained through the DB.
// All the functions on DB will return a ErrDatabaseNotOpen if accessed before Open() is called.
type DB struct {
//限于篇幅此处省略一些配置项(MaxBatchDelay、MaxBatchDelay、MmapFlags、NoGrowSync等)相关的参加定义。想了解全部内容的可以参考源码,或者文章末尾的书籍链接进行阅读。
path string
file *os.File // 真实存储数据的磁盘文件
lockfile *os.File // windows only
dataref []byte // mmap'ed readonly, write throws SEGV
// 通过mmap映射进来的地址
data *[maxMapSize]byte
datasz int
filesz int // current on disk file size
// 元数据
meta0 *meta
meta1 *meta
pageSize int
opened bool
rwtx *Tx // 写事务锁
txs []*Tx // 读事务数组
freelist *freelist // 空闲列表
stats Stats
pagePool sync.Pool
batchMu sync.Mutex
batch *batch
rwlock sync.Mutex // Allows only one writer at a time.
metalock sync.Mutex // Protects meta page access.
mmaplock sync.RWMutex // Protects mmap access during remapping.
statlock sync.RWMutex // Protects stats access.
ops struct {
writeAt func(b []byte, off int64) (n int, err error)
}
// Read only mode.
// When true, Update() and Begin(true) return ErrDatabaseReadOnly immediately.
readOnly bool
}
5.2 对外接口
1.Open()创建数据库接口
// Open creates and opens a database at the given path.
// If the file does not exist then it will be created automatically.
// Passing in nil options will cause Bolt to open the database with the default options.
// 创建数据库接口
func Open(path string, mode os.FileMode, options Options) (DB, error)
2.View()查询接口
// View executes a function within the context of a managed read-only transaction.
// Any error that is returned from the function is returned from the View() method.
// Attempting to manually rollback within the function will cause a panic.
func (db DB) View(fn func(Tx) error) error
3.Update()更新接口
// Update executes a function within the context of a read-write managed transaction.
// If no error is returned from the function then the transaction is committed.
// If an error is returned then the entire transaction is rolled back.
// Any error that is returned from the function or returned from the commit is
// returned from the Update() method.
// Attempting to manually commit or rollback within the function will cause a panic.
func (db DB) Update(fn func(Tx) error) error
4.Batch()批量更新接口
// Batch calls fn as part of a batch. It behaves similar to Update,
// except:
//
// 1. concurrent Batch calls can be combined into a single Bolt
// transaction.
//
// 2. the function passed to Batch may be called multiple times,
// regardless of whether it returns error or not.
//
// This means that Batch function side effects must be idempotent and
// take permanent effect only after a successful return is seen in
// caller.
//
// The maximum batch size and delay can be adjusted with DB.MaxBatchSize
// and DB.MaxBatchDelay, respectively.
//
// Batch is only useful when there are multiple goroutines calling it.
func (db DB) Batch(fn func(Tx) error) error
5.Begin()开启事务接口
// Begin starts a new transaction.
// Multiple read-only transactions can be used concurrently but only one
// write transaction can be used at a time. Starting multiple write transactions
// will cause the calls to block and be serialized until the current write
// transaction finishes.
//
// Transactions should not be dependent on one another. Opening a read
// transaction and a write transaction in the same goroutine can cause the
// writer to deadlock because the database periodically needs to re-mmap itself
// as it grows and it cannot do that while a read transaction is open.
//
// If a long running read transaction (for example, a snapshot transaction) is
// needed, you might want to set DB.InitialMmapSize to a large enough value
// to avoid potential blocking of write transaction.
//
// IMPORTANT: You must close read-only transactions after you are finished or
// else the database will not reclaim old pages.
func (db DB) Begin(writable bool) (Tx, error)
备注:Begin()的实现分析,参见事务4.3节内容,下面不在做分析。
下面我们将对上述接口做一一分析
5.3 Open()实现分析
Open()方法主要用来创建一个boltdb的DB对象,底层会执行新建或者打开存储数据的文件,当指定的文件不存在时, boltdb就会新建一个数据文件。否则的话,就直接加载指定的数据库文件内容。
值的注意是,boltdb会根据Open时,options传递的参数来判断到底加互斥锁还是共享锁。
新建时: 会调用init()方法,内部主要是新建一个文件,然后第0页、第1页写入元数据信息;第2页写入freelist信息;第3页写入bucket leaf信息。并最终刷盘。
加载时: 会读取第0页内容,也就是元信息。然后对其进行校验和校验,当校验通过后获取pageSize。否则的话,读取操作系统默认的pagesize(一般4k)
上述操作完成后,会通过mmap来映射数据。最后再根据磁盘页中的freelist数据初始化db的freelist字段。
// Open creates and opens a database at the given path.
// If the file does not exist then it will be created automatically.
// Passing in nil options will cause Bolt to open the database with the default options.
func Open(path string, mode os.FileMode, options Options) (DB, error) {
var db = &DB{opened: true}
// Set default options if no options are provided.
// 初始化一些配置项,此处省略
// Open data file and separate sync handler for metadata writes.
db.path = path
var err error
// 打开db文件
if db.file, err = os.OpenFile(db.path, flag|os.O_CREATE, mode); err != nil {
_ = db.close()
return nil, err
}
// Lock file so that other processes using Bolt in read-write mode cannot
// use the database at the same time. This would cause corruption since
// the two processes would write meta pages and free pages separately.
// The database file is locked exclusively (only one process can grab the lock)
// if !options.ReadOnly.
// The database file is locked using the shared lock (more than one process may
// hold a lock at the same time) otherwise (options.ReadOnly is set).
// 只读加共享锁、否则加互斥锁
if err := flock(db, mode, !db.readOnly, options.Timeout); err != nil {
_ = db.close()
return nil, err
}
// Default values for test hooks
db.ops.writeAt = db.file.WriteAt
// Initialize the database if it doesn't exist.
if info, err := db.file.Stat(); err != nil {
return nil, err
} else if info.Size() == 0 {
// Initialize new files with meta pages.
// 初始化新db文件
if err := db.init(); err != nil {
return nil, err
}
} else {
// 不是新文件,读取第一页元数据
// Read the first meta page to determine the page size.
// 2^12,正好是4k
var buf [0x1000]byte
if _, err := db.file.ReadAt(buf[:], 0); err == nil {
// 仅仅是读取了pageSize
m := db.pageInBuffer(buf[:], 0).meta()
if err := m.validate(); err != nil {
// If we can't read the page size, we can assume it's the same
// as the OS -- since that's how the page size was chosen in the
// first place.
//
// If the first page is invalid and this OS uses a different
// page size than what the database was created with then we
// are out of luck and cannot access the database.
db.pageSize = os.Getpagesize()
} else {
db.pageSize = int(m.pageSize)
}
}
}
// Initialize page pool.
db.pagePool = sync.Pool{
New: func() interface{} {
// 4k
return make([]byte, db.pageSize)
},
}
// Memory map the data file.
// mmap映射db文件数据到内存
if err := db.mmap(options.InitialMmapSize); err != nil {
_ = db.close()
return nil, err
}
// Read in the freelist.
db.freelist = newFreelist()
// db.meta().freelist=2
// 读第二页的数据
// 然后建立起freelist中
db.freelist.read(db.page(db.meta().freelist))
// Mark the database as opened and return.
return db, nil
}
// init creates a new database file and initializes its meta pages.
func (db *DB) init() error {
// Set the page size to the OS page size.
db.pageSize = os.Getpagesize()
// Create two meta pages on a buffer.
buf := make([]byte, db.pageSize*4)
for i := 0; i < 2; i++ {
p := db.pageInBuffer(buf[:], pgid(i))
p.id = pgid(i)
// 第0页和第1页存放元数据
p.flags = metaPageFlag
// Initialize the meta page.
m := p.meta()
m.magic = magic
m.version = version
m.pageSize = uint32(db.pageSize)
m.freelist = 2
m.root = bucket{root: 3}
m.pgid = 4
m.txid = txid(i)
m.checksum = m.sum64()
}
// Write an empty freelist at page 3.
// 拿到第2页存放freelist
p := db.pageInBuffer(buf[:], pgid(2))
p.id = pgid(2)
p.flags = freelistPageFlag
p.count = 0
// 第三块存放叶子page
// Write an empty leaf page at page 4.
p = db.pageInBuffer(buf[:], pgid(3))
p.id = pgid(3)
p.flags = leafPageFlag
p.count = 0
// Write the buffer to our data file.
// 写入4页的数据
if _, err := db.ops.writeAt(buf, 0); err != nil {
return err
}
// 刷盘
if err := fdatasync(db); err != nil {
return err
}
return nil
}
// page retrieves a page reference from the mmap based on the current page size.
func (db DB) page(id pgid) page {
pos := id * pgid(db.pageSize)
return (*page)(unsafe.Pointer(&db.data[pos]))
}
// pageInBuffer retrieves a page reference from a given byte array based on the current page size.
func (db DB) pageInBuffer(b []byte, id pgid) page {
return (page)(unsafe.Pointer(&b[idpgid(db.pageSize)]))
}
// mmap opens the underlying memory-mapped file and initializes the meta references.
// minsz is the minimum size that the new mmap can be.
func (db *DB) mmap(minsz int) error {
// 此处主要完成mmap。内容省略,感兴趣的同学可以查阅在线书籍对应部分详细阅读。
}
5.4 db.View()实现分析
View()主要用来执行只读事务。事务的开启、提交、回滚都交由tx控制。
// View executes a function within the context of a managed read-only transaction.
// Any error that is returned from the function is returned from the View() method.
//
// Attempting to manually rollback within the function will cause a panic.
func (db DB) View(fn func(Tx) error) error {
t, err := db.Begin(false)
if err != nil {
return err
}
// Make sure the transaction rolls back in the event of a panic.
defer func() {
if t.db != nil {
t.rollback()
}
}()
// Mark as a managed tx so that the inner function cannot manually rollback.
t.managed = true
// If an error is returned from the function then pass it through.
err = fn(t)
t.managed = false
if err != nil {
_ = t.Rollback()
return err
}
if err := t.Rollback(); err != nil {
return err
}
return nil
}
5.5 db.Update()实现分析
Update()主要用来执行读写事务。事务的开始、提交、回滚都交由tx内部控制
// Update executes a function within the context of a read-write managed transaction.
// If no error is returned from the function then the transaction is committed.
// If an error is returned then the entire transaction is rolled back.
// Any error that is returned from the function or returned from the commit is
// returned from the Update() method.
//
// Attempting to manually commit or rollback within the function will cause a panic.
func (db DB) Update(fn func(Tx) error) error {
t, err := db.Begin(true)
if err != nil {
return err
}
// Make sure the transaction rolls back in the event of a panic.
defer func() {
if t.db != nil {
t.rollback()
}
}()
// Mark as a managed tx so that the inner function cannot manually commit.
t.managed = true
// If an error is returned from the function then rollback and return error.
err = fn(t)
t.managed = false
if err != nil {
_ = t.Rollback()
return err
}
return tmit()
}
5.6 db.Batch()实现分析
现在对Batch()方法稍作分析,在DB定义的那一节中我们可以看到,一个DB对象拥有一个batch对象,该对象是全局的。当我们使用Batch()方法时,内部会对将传递进去的fn缓存在calls中。 其内部也是调用了Update,只不过是在Update内部遍历之前缓存的calls。
有两种情况会触发调用Update:
第一种情况是到达了MaxBatchDelay时间,就会触发Update第二种情况是len(db.batch.calls) >=db.MaxBatchSize,即缓存的calls个数大于等于MaxBatchSize时,也会触发Update
Batch的本质是: 将每次写、每次刷盘的操作转变成了多次写、一次刷盘,从而提升性能。
关于Batch的代码实现分析内容有点长,此处就不贴了,大家可以点击此处或者点击此处进行阅读。
5.7 总结
本章我们主要介绍了boltdb中最上层的DB对象的知识。首先介绍了DB的定义,然后介绍了下创建DB的Open()以及DB对外暴露的一些接口,这些接口基本上是平常使用最频繁的api。 在介绍了几个接口后,然后逐一对其内部的源码实现进行了分析。其实有了前几节的知识后,再来看这些接口的实现,相对比较简单。因为他们无非就是对之前的Tx、Bucket、node做的 一些封装。底层还是调用的之前介绍的那些方法。到此我们所有和bolt相关的源码分析就告一段落了。
在第6章也给大家提供了一些其他技术大牛写的源码分析的文章,大家有兴趣可以进一步阅读和学习。
6. 参考资料阅读 boltDB 源码后的小结给boltdb源码添加注释仓库boltdb官方仓库分析boltdb源码的微信公众号文章集合7.结尾
在boltdb中它还自带了一个命令行工具主要功能用来查看boltdb中所有的页以及不同页上的数据信息,以及做性能测试等,后续抽时间也会将该工具支持的功能补充到该文章中。
在没有做这件事情之前,总感觉对框架或者组件的源码分析,基本上停留在给代码加一些注释、画图梳理的层面。当真正自己动手从头到尾来写时,才发现中间有太多太多的细节,需要重新理解和把握。总体来说,这算是一次不错的体验和收获了。
在最后,本文基本上都是按照个人的理解和阅读源码基础上完成的。文章中难免有错误和理解有误的地方,大家看的时候发现问题,可以及时反馈给我,同时欢迎大家一起交流学习。