zl程序教程

您现在的位置是:首页 >  后端

当前栏目

【Go】sync.atomic

Go Sync atomic
2023-06-13 09:14:00 时间

Mutexes do no scale. Atomic loads do.

atomic

atomic 包中提供许多基本数据类型的原子操作,主要可以分为下面几类:

  1. 原子交换
  2. CAS
  3. 原子加法
  4. 原子取值
  5. 原子赋值
  6. Value

原子交换

这一类方法的作用是将 new 存储到地址 addr 并返回该地址上原来的值。

func SwapInt32(addr *int32, new int32) (old int32)
func SwapInt64(addr *int64, new int64) (old int64)
func SwapUint32(addr *uint32, new uint32) (old uint32)
func SwapUint64(addr *uint64, new uint64) (old uint64)
func SwapUintptr(addr *uintptr, new uintptr) (old uintptr)
func SwapPointer(addr *unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer)

CAS

这一类方法的作用是拿 addr 上的值和 old 比较,如果相等,就把 new 存储到 addr

func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)
func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool)
func CompareAndSwapUint32(addr *uint32, old, new uint32) (swapped bool)
func CompareAndSwapUint64(addr *uint64, old, new uint64) (swapped bool)
func CompareAndSwapUintptr(addr *uintptr, old, new uintptr) (swapped bool)
func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool)

CSA 是轻量级锁的一种常见实现方法,如:

func casADD() {
    defer w.Done()
    for i := 0; i < 10000; i++ {
        for old := a; !atomic.CompareAndSwapInt64(&a, old, old + 1);  {
            old = a
        }
    }
}

原子加法

顾名思义,是给原来 addr 地址上的值加上 delta, 并返回最新的值,需要注意的是如果使用 AddUint64 执行 x - c 需要执行 AddUint64(&x, ^uint64(c-1)), 所以原子的 x -- 可以写为 AddUint64(&x, ^uint64(0))uint32AddUint32() 同理

func AddInt32(addr *int32, delta int32) (new int32)
func AddUint32(addr *uint32, delta uint32) (new uint32)
func AddInt64(addr *int64, delta int64) (new int64)
func AddUint64(addr *uint64, delta uint64) (new uint64)
func AddUintptr(addr *uintptr, delta uintptr) (new uintptr)

原子取值和赋值

原子取值顾名思义,从地址 addr 取值并返回

func LoadInt32(addr *int32) (val int32)
func LoadInt64(addr *int64) (val int64)
func LoadUint32(addr *uint32) (val uint32)
func LoadUint64(addr *uint64) (val uint64)
func LoadUintptr(addr *uintptr) (val uintptr)
func LoadPointer(addr *unsafe.Pointer) (val unsafe.Pointer)

赋值同样,将 val 存储到地址 addr

func StoreInt32(addr *int32, val int32)
func StoreInt64(addr *int64, val int64)
func StoreUint32(addr *uint32, val uint32)
func StoreUint64(addr *uint64, val uint64)
func StoreUintptr(addr *uintptr, val uintptr)
func StorePointer(addr *unsafe.Pointer, val unsafe.Pointer)

Value

上面虽然提供了许多方法,但其面向的类型只是数值和指针,为了扩大原子操作的范围,在 Go 1.4 的时候加入了 Value

sync.atomic.Value 结构体只有一个字段 interface{} 类型的 v

type Value struct {
    v interface{}
}

且之对外暴露了 Load()Store() 两个方法,前者用来安全地从内存中读取值,后者用来将值安全地存入内存。

除了 PublicValue 外,sync.stomic.value.go 中还定义了一个私有的结构体 ifaceWords, 它包含两个指针 typdata 前者表示值的真实类型,后者表示值的“值”, 通过把 unsafe.Pointer 转换成 ifaceWords, 我们可以得到 interface{} 真实的类型和值。

type ifaceWords struct {
    typ  unsafe.Pointer
    data unsafe.Pointer
}

value 使用起来非常简单你可以把它当作一个容器,在你需要的时候可以将一个值放到该容器里,也可以从这个容器中拿出值,唯一不同的是你做的这些事都是原子性的。

type S struct {
    a int
}

func main() {
    var v atomic.Value
    s := S{1}
    v.Store(s)
    p := v.Load()
    fmt.Println(p.(S).a)
}

Store

首先,Value 中不允许存储 nil, 对应 1 ~ 3 行, x 如果为 nil 会直接抛出一个 panic, 然后通过将原来的值 v 和 将要存储的值 x 转换成 *ifaceWords 得到 xv 的具体类型和值,接下来就是一个用 CSA 实现的轻量级锁。

进入循环中,首先会使用一个上面说过的原子操作 LoadPointer 得到 vp 的真实类型 typ,根据 typ ,可以分为三种不同的情况:

  1. typ == nil :原来存储的类型是 nil ,但 Value 本身是不允许存储 nil 值的,所以这种情况只有可能是第一次存值。
  2. uintptr(typ) == ^uintptr(0): 这说明第一次存储还没结束,这时就要循环等待。
  3. typ != xp.typ: 执行到这说明 Value 中已经有旧值了,Value 要求每次写入的值类型都要与第一次写入的值类型相同,就是在这判断的,如果 xv 的类型相同,就会调用 StorePointerx 写入 v 中了。

后面两种情况比较简单,重点在第一种情况上:

在判断里首先会调用 runtime_procPin, 按照源码注释,它的作用是设置禁止抢占,同时可以避免 GC,接下来就是 CAS,看原来的值是不是还是 nil 如果不是说明已经有 goroutine 抢先它去赋值了,这时当前 goroutine 要做的只能是自旋,等待重新拿到锁,如果原来的类型还是 nil 说明当前是安全的,然后在 CAS 中,当前 goroutine 会把 vp.typ 设置成 unsafe.Pointer(^uintptr(0)) 标识 “ 我现在正在赋值 ” 别人进来看到类型是 unsafe.Pointer(^uintptr(0)) 时就会进入上面的步骤 2 自旋等待,设置完状态后就是调用 StorePointer 把新值 x 的类型和值存储在 v 的地址上,设置允许抢占,恢复 GC ,循环结束。

func (v *Value) Store(x interface{}) {
    if x == nil {
        panic("sync/atomic: store of nil value into Value")
    }
    vp := (*ifaceWords)(unsafe.Pointer(v))   // 原来的值
    xp := (*ifaceWords)(unsafe.Pointer(&x))  // 即将存储的值
    for {
        typ := LoadPointer(&vp.typ)
        if typ == nil {          // 第一次存储值 
            runtime_procPin()    // 禁止抢占,防止 GC 看到 unsafe.Pointer(^uintptr(0)) 这个奇怪的类型
            if !CompareAndSwapPointer(&vp.typ, nil, unsafe.Pointer(^uintptr(0))) {
                runtime_procUnpin()
                continue         // 比较不通过,说明有别人在执行赋值,自旋等待
            }
            
            StorePointer(&vp.data, xp.data)   // 设置新置
            StorePointer(&vp.typ, xp.typ)     // 设置类型
            runtime_procUnpin()
            return
        }
        if uintptr(typ) == ^uintptr(0) {       // 赋值没结束,自旋等待
            continue
        }
        
         // 后面赋值类型必须与第一次赋值类型相同
        if typ != xp.typ {
            panic("sync/atomic: store of inconsistently typed value into Value")
        }
        StorePointer(&vp.data, xp.data)        // 只有第一次需要设置 tpy, 后面只需要设置 data 
        return
    }
}

Load

相比 StoreLoad 很简单,它任然需要通过 ifaceWords 拿到 v 的真实类型,如果 v 中没有存值或正在写入,他会直接返回 nil,否则就把 v.datav.typ 重新组装成 interface{} 返回。

func (v *Value) Load() (x interface{}) {
    vp := (*ifaceWords)(unsafe.Pointer(v))
    typ := LoadPointer(&vp.typ)
    if typ == nil || uintptr(typ) == ^uintptr(0) {
        // First store not yet completed.
        return nil
    }
    data := LoadPointer(&vp.data)
    xp := (*ifaceWords)(unsafe.Pointer(&x))
    xp.typ = typ
    xp.data = data
    return
}

总结

Value 是 Go 1.4 之后才有的一个机制,它为所有提供了类似 StoreInt64LoadInt64 的方法,这样可以避免其他对象取赋值时不得不使用锁而导致性能下降。

Mutex由操作系统实现,而atomic包中的原子操作则由底层硬件直接提供支持。在 CPU 实现的指令集里,有一些指令被封装进了atomic包,这些指令在执行的过程中是不允许中断(interrupt)的,因此原子操作可以在lock-free的情况下保证并发安全,并且它的性能也能做到随 CPU 个数的增多而线性扩展。

原子操作由底层硬件支持,而锁则由操作系统的调度器实现。锁应当用来保护一段逻辑,对于一个变量更新的保护,原子操作通常会更有效率,并且更能利用计算机多核的优势,如果要更新的是一个复合对象,则应当使用atomic.Value封装好的实现。