前言

 go的所有并发模型都是由sync.Mutex提供的同步原语来实现的,底层则进一步依赖操作系统原语,比如Linux 中的 futex(快速用户空间互斥体)

Mutex 几种状态

  1. 未锁定(Unlocked):这种情况下任何goroutine都可以通过调用Lock方法来获取锁;
  2. 已锁定(Locked):当一个goroutine获取到锁时,状态变为Locked,其他任何尝试获取这个锁的都将阻塞,直到锁被持有者通过Unlock()的方法释放
  3. 竞争状态(Contended):当Locked 状态的 Mutex 被其他 goroutine 尝试获取时,这些goroutine会进入阻塞状态,此时Mutex也就进入竞争状态(Contended),Mutex会在Unlock()调用时唤醒某个/些goroutine(进入Waken状态)来获取锁,而长时间等待的goroutine则会进入饥饿状态(Starvation)

竞争状态下的正常模式和饥饿模式

  1. 正常模式
    • Unlock()被调用时,唤醒等待的一个goroutine(通常是等待时间最长的)
    • 唤醒的goroutine不会立刻获得锁,而是和其他goroutine一起竞争锁
    • 如果唤醒的goroutine没有在一定时间内(通常1ms)获得锁,则切换为饥饿模式
  2. 饥饿模式
    • 主要解决高竞争情况下goroutine长时间等待遇到的饥饿问题
    • 当一个goroutine 等待锁超过一定时间的阈值(通常1ms)触发,饥饿模式下,锁的所有权将直接从解锁的 goroutine 交给等待队列中的下一个(即等待时间最长的那个)
    • 饥饿模式下,新到的goroutine直接加入等待队列尾部,即使锁刚好是Unlock状态
    • 当一个处于饥饿模式(starvation mode)的锁被持有的 goroutine 解锁后,并且下一个等待的 goroutine满足:
      • 可以立即获取到锁并且没有其他等待获得锁的 goroutine;
      • 是在锁变成饥饿模式前就已经在等待的;
        那么,这时会将锁切换回正常模式

Mutex允许自旋的条件

 自旋机制是用来改善锁性能的,即当一个goroutine尝试获取已locked的锁时,它可能会自旋,即重复检查锁,避免频繁切换上下文带来的开销;而长时间自旋会浪费CPU时间片(尤其是锁持有时间较长时)
 以下情况会触发自旋:

  1. CPU 核数大于 1—系统需要有足够的资源进行自旋,只有单核是做不到的
  2. 有空闲的P且本地待运行队列为空—即需要系统的cpu使用率相对较低,否则还是优先保证现有程序的执行
  3. 积累的自旋次数小于最大自旋次数(active_spin=4)
  4. 锁已被占用,并且锁不处于饥饿模式—锁占用才会用到自旋,而如果触发了饥饿则不会触发竞争,也无需自旋

RWMutex 实现

  • sync.RWMutex优化了读多写少的场景,它具有如下特性
    1. 当没有写入者时,允许多个 goroutine 持有读锁(共享锁)
    2. 写锁(排他锁)会阻止其他写锁和读锁的获取
    3. 读锁可以很快地连续进行,因为它不需要改变锁的专题,而写锁则需要等待所有读锁写锁的释放
  • sync.RWMutex基本方法如下:
    1. Lock() — 获取写锁,阻塞至没有其他读锁和写锁
    2. Unlock() — 释放写锁
    3. RLock() — 获取读锁,有写锁的情况下等待写锁释放
    4. RNLock() — 释放读锁
  • 读写锁的原理实现:
    1. 可以通过一个读锁计数器和2个Mutex实现
    2. 写操作时,首先增加等待写锁数量,如果存在活跃的读/写锁,它将阻塞,直到获得写锁后减少待写锁的数量
    3. 读操作时,如果有活跃的写锁或者等待的写锁,它将等待,反之,增加读计数器
    4. 写操作释放时,会检查是否有等待的读锁或写锁(自测是读锁优先),适当的唤醒goroutine。而读操作释放时,则会简单地减少计数器,如果这是最后一个读锁且有写锁等待,则唤醒写锁
  • 注意事项:
    1. 锁的升级和降级:go语法无法在已锁定的情况下切换锁,写锁可以降级为读锁,解锁后立刻锁读锁即可
    2. 频繁读操作会导致写锁饥饿,可能需要在应用层协助解决
    3. 锁被使用的情况下不要复制Mutex,否则可能会出现不可预知的行为
    4. 读锁可以上多次,读锁上和释放的顺序不一定,写锁只能一次
    5. 锁与内存同步,可解释如下:当你释放一个锁时,你是在表达:“我已完成在共享资源上的修改,它们可以被其他 goroutine 安全读取了。”当你获得一个锁时,你是在表达:“我想查看最新的,安全的共享资源。”

sync.Cond

Cond也是一种并发控制原语,主要侧重于条件控制,它可以让一些已进入锁定状态的goroutine在满足特定的情况下被唤醒,总是和sync.Mutexsync.RWMutex配合使用,举例如下:

	c := sync.NewCond(&sync.Mutex{})
	var ready int

	for i := 0; i < 10; i++ {
		go func(i int) {
			time.Sleep(time.Duration(rand.Int63n(10)) * time.Second)

			// 加锁更改等待条件
			c.L.Lock()
			ready++

			c.L.Unlock()
			log.Printf("运动员#%d 已准备就绪\n", i)
			// 广播唤醒所有的等待者
			c.Signal() // 随机唤醒一个,如果要全部唤醒,则使用 c.Broadcast()
		}(i)
	}

	go func() {
		c.L.Lock()
		fmt.Println("进入锁定线程1。。。")
		for ready != 10 {
			c.Wait()
			log.Println("裁判员1被唤醒一次")
		}
		c.L.Unlock()
	}()
	c.L.Lock()
	fmt.Println("进入锁定线程2。。。")
	for ready != 10 {
		c.Wait()
		log.Println("裁判员2被唤醒一次")
	}
	c.L.Unlock()

	//所有的运动员是否就绪
	log.Println("所有运动员都准备就绪。比赛开始,3,2,1, ......")

注意点:

  1. Signal()Boardcast()都不一定在Lock语句内使用
  2. Signal()随机触发其中一个Wait()的线程(通常是等待时间最长的),而Boardcast() 则会让全部线程唤醒
  3. Wait()必须在锁控制语句中,否则会出现2次unlock()错误
  4. 如果没有任何Wait()的goroutine,则Signal()Boardcast()都没有任何作用
  5. 条件控制提现在包裹Wait()的for循环,如果把这个循环注释,虽然不会报错,但是条件控制就失去意义了
  6. 内部实质上就是利用锁实现了一个等待队列

WaitGroup

WaitGroup作为并发控制原语,侧重于协调等待,即等待一组goroutine全部完成
使用示例:

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    var wg sync.WaitGroup

    // 启动多个 goroutine
    for i := 0; i < 5; i++ {
        wg.Add(1) // 为每个 goroutine 增加计数
        go func(i int){
            defer wg.Done() // 在退出 goroutine 前递减计数,也是`Add(-1)`的快捷调用
            time.Sleep(2 * time.Second) // 模拟耗时操作
            fmt.Printf("Goroutine %d finished\n", i)
        }(i)
    }

    fmt.Println("Waiting for goroutines...")
    wg.Wait() // 等待所有 goroutine 调用 Done
    fmt.Println("All goroutines completed")
}

需要注意的点:

  1. Add()的数目和Done()/Add(-x)的数据必须保持一致,如果最终的计数器小于0则会触发panic
  2. wg对象不能被拷贝,应当使用指针指向,否则会出现未知错误
  3. Wait()可以被多个goroutine持有,当计数器为0时候,所有等待的goroutine都会被唤醒
  4. 底层实现:2个计数器(等待/唤醒)都是int32,组合成一个int64,使用atomic进行数值加减,最终实现一个semaphore内部信号量机制

sync.Once

Once作为并发控制原语,侧重于单次执行,即确保函数在多个goroutine,只会执行一次
使用示例:

var once sync.Once

func main() {
    for i:= 0; i < 5; i++ {
        go func(i int){
            once.Do(func(){
                fmt.Println("Only once", i)
            })
        }(i)
    }
    // 等待所有goroutines完成并查看影响
    time.Sleep(time.Second)
}

注意点:

  1. 内部实现是一个bool标记和互斥锁
  2. 通常是用于只执行一次的初始化代码
  3. 可以一定程度上实现幂等逻辑,也可用来实现单例模式
  4. 其底层是双检查的机制,即未拿到执行权的,会等待执行权的执行完成
  5. 不能嵌套once.Do,会死锁

Atomic Operation

 即原子操作,很基础的并发模型,通常作为其他并发模型实现的一个方式,现代计算机架构中,通常是机器指令层次直接支持的,比如x86的cmpxchg,go语言中则由sync/atomic 实现
例如:

vat counter int32

func increment() {
    atomic.AddInt32(&counter, 1) // 原子地将 counter 的值加 1
}

多进程调用increment()时,则像单线程执行一样,每一次都是独立的
注意点:

  1. 其通常用于非集群环境下乐观锁实现
  2. 和锁实现的区别:
    • 原子操作基于cpu支持,通常执行更快,适合简单的场景
    • 锁设计到更多复杂的机制,如锁定、阻塞、唤醒等,通常适用于较复杂的操作和数据结构,锁可能会导致一些问题如死锁,优先级反转,饥饿或过度的线程调度延迟等

CAS

 即比较并交换(Compare And Swap),一种用于同步的原子指令,通常用于多线程编程中的锁和并发数据结构的实现。CAS 是实现无锁编程(lock-free programming)原语的基础
伪代码实现为:

function CAS(V, A, B):
 if *V == A:
    *V = B
    return true
  else:
    return false

注意点:

  1. 执行较快时会导致ABA问题
  2. 通常被用来实现自旋锁和其他同步工具

sync.Pool

 一种高频临时对象的复用机制,类似线程池实现了一个“临时对象池”,示例如下:

var pool = sync.Pool {
    New: func() interface{} {
      return new(bytes.Buffer)
    },
}

// 假设处理一些工作,需要临时的 Buffer 对象
func doWork() {
    buf := pool.Get().(*byte.Buffer) // 从池中获取一个 Buffer
    // 使用 buf 进行工作...
    buf.Reset()       // 清楚 Buffer,以便复用
    pool.Put(buf)      // g工作w完成后把 Buffer 放回 pool 中
}

优点:

  1. 减少GC压力
  2. 保持实例,减少内存分配次数,提高性能
  3. 减少内存泄漏风险
  4. 不适用以下情况:
    • 创建和维护对象代价很高的场景。
    • 对象的大小非常大,可能导致池自身维护的成本超过收益。
    • 池内对象有可能导致显著的内存泄露。