大白话讲讲 Go 语言的 sync.Map(二)

简介: 上一篇文章《大白话讲讲 Go 语言的 sync.Map(一)》讲到 entry 数据结构,原因是 Go 语言标准库的 map 不是线程安全的,通过加一层抽象回避这个问题……

上一篇文章 《大白话讲讲 Go 语言的 sync.Map(一)》 讲到 entry 数据结构,原因是 Go 语言标准库的 map 不是线程安全的,通过加一层抽象回避这个问题。

当一个 key 被删除的时候,比如李四销户了,以前要撕掉小账本,现在可以在大账本上写 expunged,

对,什么也不写也是 OK 的。也就是说,

entry.p 可能是真正的数据的地址,也可能是 nil,也可能是 expunged。

为什么无端端搞这个 expunged 干嘛?因为 sync.Map 实际上是有两个小账本,

一个叫 readOnly map(只读账本),一个叫 dirty map(可读、也可写账本):

type Map struct {
    mu sync.Mutex
    read atomic.Value // readOnly
    dirty map[interface{}]*entry
    misses int
}

type readOnly struct {
    m       map[interface{}]*entry
    amended bool // true if the dirty map contains some key not in m.
}

既然有账本一个变成两个,那肯定会有些时候出现两个 map 数据是不一致的情况。

readOnly 结构的 amended 字段,是一个标记,为 true 的时候代表 dirty map 包含了一些 key,这些 key 不会存在 readOnly map 中。

这个字段的作用,在于加速查找的过程。

假设 readOnly 账本上有 张三、李四、钱五,dirty 账本除了这三个人,后面又新增了 王六,查找逻辑就是这样的:

  1. 先在 readOnly 查找,王六不在
  2. 判断 amended ,发现两个账本数据是不一致的
  3. 再去 dirty 账本查找,终于找到王六

如果 2 的 amended 标记是两个账本数据一致,那就没有执行 3 的必要了。

我们可以看看源码是怎么实现的:

func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
  read, _ := m.read.Load().(readOnly)
  // 1. 先在 readOnly 查找,王六不在
  e, ok := read.m[key]
  // 2. 判断 amended ,发现两个账本数据是不一致的
  if !ok && read.amended {
    // 加锁的原因是,前面步骤 1 的读取有可能被另一个协程的 missLocked 更改了
    // 导致读出来的值不符合预期,所以加锁再读取一次,老套路了。
    m.mu.Lock()
    read, _ = m.read.Load().(readOnly)
    e, ok = read.m[key]
    if !ok && read.amended {
      // 3. 再去 dirty 账本查找,终于找到王六
      e, ok = m.dirty[key]
      // missLocked 拥有一个计数器,
      // 它的作用在于 readOnly 如果一直查不到,经常退化到 dirty,
      // 那就把 dirty 作为 readOnly ,直接取代它。
      m.missLocked()
    }
    m.mu.Unlock()
  }
  if !ok {
    return nil, false
  }
  return e.load() // 还记得大账本吗?这里是拿到最终的值,针对 entry.p == expunged 做了特殊处理。
}

func (m *Map) missLocked() {
  // 查不到就递增 misses 计数器
  m.misses++
  // 这个判断条件不是常数,而是 dirty map 的记录数。
  // 这个判断条件很奇妙,
  // 它使得 dirty 取代 readOnly 的时机,和 dirty 的数据量正相关了。
  // 也就是说,dirty map 越大,对两个 map 不一致的容忍度越大,
  // 不会有频繁的取代操作。
  if m.misses < len(m.dirty) {
    // 如果不是经常查不到,说明 readOnly 还是可以用的,退出。
    return
  }
  // 如果 readOnly 已经没有存在价值,那就把 dirty 取代 readOnly。
  // 此时,dirty 置空,并把 misses 计数器置 0。
  // read 和 dirty 的数据类型都是 map[interface{}]*entry,
  // 可以直接替换,无需类型转换,这个设计简直完美。
  m.read.Store(readOnly{m: m.dirty})
  m.dirty = nil
  m.misses = 0
}

func (e *entry) load() (value interface{}, ok bool) {
  p := atomic.LoadPointer(&e.p)
  // entry.p 可能是真正的数据的地址,也可能是 nil,也可能是 expunged
  if p == nil || p == expunged {
    // nil 或者是 expunged 都是不存在的,返回空
    return nil, false
  }
  // 如果是真正的数据地址,那就返回真正的数据(就是拿到大账本的某一页纸上的内容)
  return *(*interface{})(p), true
}

到这里已经讲完数据读取这部分的代码了,接着再讲数据是怎么写入的。

上一篇文章我留了一个思考题,

为什么小账本不能做到同时修改?限于篇幅,我不会展开。

我现在解答我们有了大账本,是如何做到同时修改的!

答案在这里:

// tryStore 顾名思义,就是不断尝试的意思。
// 你可以看到有一个无条件的死循环,只有某些条件满足的时候才会退出
// 计算机术语:自旋(自己一直在旋转)
func (e *entry) tryStore(i *interface{}) bool {
  for {
    p := atomic.LoadPointer(&e.p)
    // readOnly map 存储的是 entry 结构,p 就是所谓的大账本,
    // p 指向大账本上某一页纸上的内容,
    // 当账本查不到的时候,返回查不到。
    if p == expunged {
      return false
    }
    // 当账本可以查到的时候,使用 CAS 把旧的值,替换为新的值。
    // 可以查到并替换成功,返回成功,函数退出
    // 查不到或者替换失败,自旋,重试,直到成功为止
    if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) {
      return true
    }
  }
}

问题来了, CAS(Compare and Swap,比较并交换)是什么东西?我们看这个加法函数:

func add(delta int32) {
  for {
    // 把原先的值取出来
    oldValue := atomic.LoadInt32(&addr)
    // 读取后,如果没有其他人对它修改(Compare)
    // 那就用 oldValue+delta 新值,替换掉原来的值(Swap)
    // 成功程序退出,失败了就自旋重试(可能被其他人改了导致 Compare 不成功)
    if atomic.CompareAndSwapInt32(&addr, oldValue, oldValue+delta) {
      return
    }
  }
}

越来越有趣了,atomic.CompareAndSwapInt32 到底是个啥子哟?

它的具体实现在 src/runtime/internal/atomic/asm_amd64.s 里(不同 CPU 架构,使用的文件不同,这里以最常见的 amd64 为例):

// bool Cas(int32 *val, int32 old, int32 new)
// Atomically:
//  if (*val == old) {
//    *val = new;
//    return 1;
//  } else
//    return 0;
TEXT runtime∕internal∕atomic·Cas(SB),NOSPLIT,$0-17
  MOVQ  ptr+0(FP), BX
  MOVL  old+8(FP), AX
  MOVL  new+12(FP), CX
  LOCK
  CMPXCHGL  CX, 0(BX)
  SETEQ  ret+16(FP)
  RET

FP(Frame pointer: arguments and locals):

函数的输入参数,格式 symbol+offset(FP),symbol 没有实际意义,只为了增强代码可读性,但没有 symbol 程序无法编译。

ptr+0(FP) 代表第一个参数,取出复制给 BX 寄存器。

由于 ptr 是一个指针,在 64 位的处理器中,一个指针的占 8 个字节,

所以第二个参数 old+8(FP),偏移量 offset 等于 8,

而第三个参数 new+12(FP),偏移量再加 4 的原因是 int32 占据 4 个字节。

LOCK 指令前缀会设置处理器的 LOCK# 信号,锁定总线,阻止其他处理器接管总线访问内存,

设置 LOCK# 信号能保证某个处理器对共享内存的独占使用。

CMPXCHGL CX, 0(BX) 是比较并交换的指令,将 AX 和 CX 比较,相同将 BX 指向的内容 放入 CX,

CMPXCHGL 暗中使用了 AX 寄存器。

兜了一大圈,终于明白大账本的数据是怎样被更新的了。

看看数据是怎么写入之前,我们要知道数据是怎么被删除的:

// 删除的逻辑是比较简单的。
func (m *Map) LoadAndDelete(key interface{}) (value interface{}, loaded bool) {
  read, _ := m.read.Load().(readOnly)
  e, ok := read.m[key]
  // key 不存在的时候并且 readOnly map 和 dirty map 不一致时,
  // 把 dirty map 对应的记录删了。
  if !ok && read.amended {
    m.mu.Lock()
    read, _ = m.read.Load().(readOnly)
    e, ok = read.m[key]
    if !ok && read.amended {
      // 数据不一致的时候,最终读出来的值以 dirty map 为主,
      // 即使 readOnly map 是 !ok 的,但 dirty map 可能是 ok 的,
      // 既然值可能是存在的,那就读取出来。
      e, ok = m.dirty[key]
      // 删除操作
      delete(m.dirty, key)
      // 递增数据不一致的计数器。
      // 太多不一致会把 dirty map 提升为 readOnly map,前面讲过了。
      m.missLocked()
    }
    m.mu.Unlock()
  }
  // key 存在的时候,把 key 置为 nil,注意这里不是 expunged,
  // 这也是我为什么要先讲 Delete 的原因。
  if ok {
    return e.delete()
  }
  return nil, false
}

// Delete deletes the value for a key.
func (m *Map) Delete(key interface{}) {
  m.LoadAndDelete(key)
}

// delete 将对应的 key 置为 nil!而不是 expunged!
func (e *entry) delete() (value interface{}, ok bool) {
  for {
    p := atomic.LoadPointer(&e.p)
    if p == nil || p == expunged {
      return nil, false
    }
    if atomic.CompareAndSwapPointer(&e.p, p, nil) {
      return *(*interface{})(p), true
    }
  }
}

OK,我们看数据写入的逻辑,它是整个源码中最难理解的,隐含的逻辑关系非常多:

// unexpungeLocked 将 expunged 的标记变成 nil。
func (e *entry) unexpungeLocked() (wasExpunged bool) {
  return atomic.CompareAndSwapPointer(&e.p, expunged, nil)
}

// storeLocked 将 entry.p 指向具体的值
func (e *entry) storeLocked(i *interface{}) {
  atomic.StorePointer(&e.p, unsafe.Pointer(i))
}
// tryExpungeLocked 尝试 entry.p == nil 的 entry 标记为删除(expunged)
func (e *entry) tryExpungeLocked() (isExpunged bool) {
  p := atomic.LoadPointer(&e.p)
  // for 循环的作用,可以保证 p != nil,
  // 保证写时复制过程中,p == nil 的情况不会被写到 dirty map 中。
  for p == nil {
    if atomic.CompareAndSwapPointer(&e.p, nil, expunged) {
      return true
    }
    p = atomic.LoadPointer(&e.p)
  }
  return p == expunged
}

// dirtyLocked 写时复制,两个 map 都找不到新增的 key 的时候调用的。
func (m *Map) dirtyLocked() {
  // dirty 被置为 nil 的情景还记得吗?
  // 
  // 当 readOnly map 一直读不到,需要退化到 dirty map 读取的时候,
  // dirty map 会被提升为 readOnly map,
  // 此时,dirty map 就会被置空。
  //
  // 但是,dirtyLocked 被调用之前,
  // 都是判断 read.amended 是否为 false
  // if !read.amended {...}
  // 个人认为,可以直接判断 if m.dirty == nil {...},
  // 代码可读性更强!下面三行代码也可以不要了。
  if m.dirty != nil {
    return
  }
  // 遍历 readOnly map,把里面的内容都复制到新创建的 dirty map 中。
  read, _ := m.read.Load().(readOnly)
  m.dirty = make(map[interface{}]*entry, len(read.m))
  for k, e := range read.m {
    // tryExpungeLocked 将 entry.p == nil 设置为 expunged,
    // 遍历之后,所有的 nil 都变成 expunged 了。
    // 返回 false 说明 p 是有值的,要拷贝到 dirty 里。
    // Delete 操作会把有值的状态,转移为 nil,
    // 并不会把 expunged 状态转移为 nil,
    // 由于 for 循环的存在,p 也不会等于 nil,
    // 也就是说,tryExpungeLocked 的 p == expunged 是可以信任的。
    if !e.tryExpungeLocked() {
      // 如果没有被删除,拷贝到 dirty map 中。
      m.dirty[k] = e
    }
  }
}

func (m *Map) Store(key, value interface{}) {
  // 如果 readOnly map 有对应的 key,
  // 通过 e.tryStore 直接写入(就是上面更新大账本的整个过程),
  // 注意,tryStore 会在 entry.p == expunged 的情况下失败。
  read, _ := m.read.Load().(readOnly)
  if e, ok := read.m[key]; ok && e.tryStore(&value) {
    return
  }
  // readOnly map 找不到,或者 key 被删除了,
  // 那就写到 dirty map 里面。
  m.mu.Lock()
  read, _ = m.read.Load().(readOnly)
  if e, ok := read.m[key]; ok {
    // unexpungeLocked 将 expunged 的标记变成 nil。
    // 当 entry.p == expunged,并且成功替换为 nil,
    // 返回 true。
    // 
    // 这个分支的意义在于,写时复制 dirtyLocked 的时候,
    // 数据从 readOnly map 搬迁到 dirty map 中,
    // 如果 p 是被删除的,dirty 是不会有这个 key 的,
    // 所以要把它也写进 dirty 中,保证数据的一致性。
    // 
    // 为什么好端端的 expunged,要改成 nil?
    // unexpungeLocked 是一个原子操作,成功的话,
    // 说明 p == expunged,
    // 说明写时复制已经完成。
    // 
    // 为什么要写时复制完成之后,才可以去改 dirty?
    // 我理解是这样的:
    // 如果不这样做,dirty 会被你修改成 Store 传进来的参数,
    // 写时复制又把它修改成 readOnly map 的值,
    // 所以更新 readOnly map 就好了。
    // 
    // 这一块的细节真的非常多,每一块地方都要小心处理好。
    if e.unexpungeLocked() {
      m.dirty[key] = e
    }
    // 写入值。
    e.storeLocked(&value)
  } else if e, ok := m.dirty[key]; ok {
    // 如果 dirty map 存在就直接更新进去,这个很好理解,
    // 因为 readOnly map 找不到会来 dirty 查。
    e.storeLocked(&value)
  } else {
    // 两个 map 都找不到的时候,说明这是一个新的 key。
    // 
    // 1. 如果 dirty 之前被提升为 readOnly,那就导一份没有被删除的 key 进来。
    // 
    // 这个判断条件,我理解等价于 if m.dirty == nil {...}
    if !read.amended {
      // 初始化 m.dirty,并把值写进去(写时复制)
      m.dirtyLocked()
      // amended 设置为不一致。
      // amended 表示 dirty 是否包含了 readOnly 没有的记录,
      // 很明显,read.m[key] 是 !ok 的,
      // 下面把值存到 dirty map 里面了。
      m.read.Store(readOnly{m: read.m, amended: true})
    }
    // 2. 这里,把值存到 dirty map 中。
    m.dirty[key] = newEntry(value)
  }
  m.mu.Unlock()
}

精妙绝伦!整个写入的逻辑就讲完了,最后看看遍历吧,非常简单:

func (m *Map) Range(f func(key, value interface{}) bool) {
  read, _ := m.read.Load().(readOnly)
  // 如果不一致,就把 dirty 提升为 readOnly,
  // 同时 dirty 置空,
  // 因为 dirty map 也包含了 readOnly map 没有的 key。
  if read.amended {
    m.mu.Lock()
    read, _ = m.read.Load().(readOnly)
    if read.amended {
      read = readOnly{m: m.dirty}
      m.read.Store(read)
      m.dirty = nil
      m.misses = 0
    }
    m.mu.Unlock()
  }
  // 遍历 readOnly map 的数据,执行回调函数。
  for k, e := range read.m {
    v, ok := e.load()
    if !ok {
      continue
    }
    if !f(k, v) {
      break
    }
  }
}

好了,到这里整个 sync.Map 就讲完了,剩下的代码也没多少了,套路差不多,我们总结一下:

  1. 在读多写少的场景下,sync.Map 的性能非常高,因为访问 readOnly map 是无锁的;
  2. Load:先查找 readOnly map,找不到会去找 dirty map,如果经常没命中,dirty map 会被提升为 readOnly map,提升的时机跟 dirty 的大小相关,dirty 越大,容忍不命中的次数就越多,也就越难提升;
  3. Delete:当 readOnly map 的 key 不存在的时候,会去删除 dirty map 中的 key;如果 readOnly map 的 key 存在,entry.p 置为 nil;
  4. Store :
    a. readOnly map 的 key 存在时,entry.p != expunged 时直接更新,entry.p == expunged 就改成 nil,此时数据也同步写入 dirty map;
    b. readOnly map 的 key 不存在时,dirty map 有就更新进去,两个都没有,触发写时复制机制:搬迁 readOnly map 的没有被删除的 key 到 dirty map 中,新值写入 dirty map,并设置 amended 标记为 true。
  5. sync.Map 的缺陷在于读少写多的时候,dirty map 会被一直更新,misses 次数增加,dirty 置空后,数据又重新从 readOnly map 同步回去,使得 sync.Map 忙于数据搬迁工作,影响性能。

这篇文章近 5000 字(第一篇差不多 2000 字),从构思、成文到校对,真的需要花费不少时间,希望对你有帮助!


文章来源于本人博客,发布于 2021-05-05,原文链接:https://imlht.com/archives/258/

目录
相关文章
|
2天前
|
测试技术 Go
go语言中测试工具
【10月更文挑战第22天】
10 4
|
2天前
|
SQL 关系型数据库 MySQL
go语言中数据库操作
【10月更文挑战第22天】
12 4
|
2天前
|
缓存 前端开发 中间件
go语言中Web框架
【10月更文挑战第22天】
13 4
|
5天前
|
Go
go语言的复数常量
【10月更文挑战第21天】
18 6
|
5天前
|
Go
go语言的浮点型常量
【10月更文挑战第21天】
13 4
|
5天前
|
Serverless Go
Go语言中的并发编程:从入门到精通
本文将深入探讨Go语言中并发编程的核心概念和实践,包括goroutine、channel以及sync包等。通过实例演示如何利用这些工具实现高效的并发处理,同时避免常见的陷阱和错误。
|
2天前
|
安全 测试技术 Go
Go语言中的并发编程模型解析####
在当今的软件开发领域,高效的并发处理能力是提升系统性能的关键。本文深入探讨了Go语言独特的并发编程模型——goroutines和channels,通过实例解析其工作原理、优势及最佳实践,旨在为开发者提供实用的Go语言并发编程指南。 ####
|
2月前
|
Go 定位技术 索引
Go 语言Map(集合) | 19
Go 语言Map(集合) | 19
|
2月前
|
存储 前端开发 API
ES6的Set和Map你都知道吗?一文了解集合和字典在前端中的应用
该文章详细介绍了ES6中Set和Map数据结构的特性和使用方法,并探讨了它们在前端开发中的具体应用,包括如何利用这些数据结构来解决常见的编程问题。
ES6的Set和Map你都知道吗?一文了解集合和字典在前端中的应用
|
1月前
|
存储 分布式计算 Java
Stream很好,Map很酷,但答应我别用toMap():Java开发中的高效集合操作
在Java的世界里,Stream API和Map集合无疑是两大强大的工具,它们极大地简化了数据处理和集合操作的复杂度。然而,在享受这些便利的同时,我们也应当警惕一些潜在的陷阱,尤其是当Stream与Map结合使用时。本文将深入探讨Stream与Map的优雅用法,并特别指出在使用toMap()方法时需要注意的问题,旨在帮助大家在工作中更高效、更安全地使用这些技术。
35 0