前言
go的所有并发模型都是由sync.Mutex
提供的同步原语来实现的,底层则进一步依赖操作系统原语,比如Linux 中的 futex(快速用户空间互斥体)
Mutex 几种状态
- 未锁定(Unlocked):这种情况下任何goroutine都可以通过调用
Lock
方法来获取锁; - 已锁定(Locked):当一个goroutine获取到锁时,状态变为Locked,其他任何尝试获取这个锁的都将阻塞,直到锁被持有者通过
Unlock()
的方法释放 - 竞争状态(Contended):当Locked 状态的 Mutex 被其他 goroutine 尝试获取时,这些goroutine会进入阻塞状态,此时Mutex也就进入竞争状态(Contended),Mutex会在
Unlock()
调用时唤醒某个/些goroutine(进入Waken状态)来获取锁,而长时间等待的goroutine则会进入饥饿状态(Starvation)
竞争状态下的正常模式和饥饿模式
- 正常模式
- 当
Unlock()
被调用时,唤醒等待的一个goroutine(通常是等待时间最长的) - 唤醒的goroutine不会立刻获得锁,而是和其他goroutine一起竞争锁
- 如果唤醒的goroutine没有在一定时间内(通常1ms)获得锁,则切换为饥饿模式
- 当
- 饥饿模式
- 主要解决高竞争情况下goroutine长时间等待遇到的饥饿问题
- 当一个goroutine 等待锁超过一定时间的阈值(通常1ms)触发,饥饿模式下,锁的所有权将直接从解锁的 goroutine 交给等待队列中的下一个(即等待时间最长的那个)
- 饥饿模式下,新到的goroutine直接加入等待队列尾部,即使锁刚好是Unlock状态
- 当一个处于饥饿模式(starvation mode)的锁被持有的 goroutine 解锁后,并且下一个等待的 goroutine满足:
- 可以立即获取到锁并且没有其他等待获得锁的 goroutine;
- 是在锁变成饥饿模式前就已经在等待的;
那么,这时会将锁切换回正常模式
Mutex允许自旋的条件
自旋机制是用来改善锁性能的,即当一个goroutine尝试获取已locked
的锁时,它可能会自旋,即重复检查锁,避免频繁切换上下文带来的开销;而长时间自旋会浪费CPU时间片(尤其是锁持有时间较长时)
以下情况会触发自旋:
- CPU 核数大于 1—系统需要有足够的资源进行自旋,只有单核是做不到的
- 有空闲的P且本地待运行队列为空—即需要系统的cpu使用率相对较低,否则还是优先保证现有程序的执行
- 积累的自旋次数小于最大自旋次数(active_spin=4)
- 锁已被占用,并且锁不处于饥饿模式—锁占用才会用到自旋,而如果触发了饥饿则不会触发竞争,也无需自旋
RWMutex 实现
sync.RWMutex
优化了读多写少的场景,它具有如下特性- 当没有写入者时,允许多个 goroutine 持有读锁(共享锁)
- 写锁(排他锁)会阻止其他写锁和读锁的获取
- 读锁可以很快地连续进行,因为它不需要改变锁的专题,而写锁则需要等待所有读锁写锁的释放
sync.RWMutex
基本方法如下:- Lock() — 获取写锁,阻塞至没有其他读锁和写锁
- Unlock() — 释放写锁
- RLock() — 获取读锁,有写锁的情况下等待写锁释放
- RNLock() — 释放读锁
- 读写锁的原理实现:
- 可以通过一个读锁计数器和2个Mutex实现
- 写操作时,首先增加等待写锁数量,如果存在活跃的读/写锁,它将阻塞,直到获得写锁后减少待写锁的数量
- 读操作时,如果有活跃的写锁或者等待的写锁,它将等待,反之,增加读计数器
- 写操作释放时,会检查是否有等待的读锁或写锁(自测是读锁优先),适当的唤醒goroutine。而读操作释放时,则会简单地减少计数器,如果这是最后一个读锁且有写锁等待,则唤醒写锁
- 注意事项:
- 锁的升级和降级:go语法无法在已锁定的情况下切换锁,写锁可以降级为读锁,解锁后立刻锁读锁即可
- 频繁读操作会导致写锁饥饿,可能需要在应用层协助解决
- 锁被使用的情况下不要复制Mutex,否则可能会出现不可预知的行为
- 读锁可以上多次,读锁上和释放的顺序不一定,写锁只能一次
- 锁与内存同步,可解释如下:当你释放一个锁时,你是在表达:“我已完成在共享资源上的修改,它们可以被其他 goroutine 安全读取了。”当你获得一个锁时,你是在表达:“我想查看最新的,安全的共享资源。”
sync.Cond
Cond
也是一种并发控制原语,主要侧重于条件控制,它可以让一些已进入锁定状态的goroutine在满足特定的情况下被唤醒,总是和sync.Mutex
和sync.RWMutex
配合使用,举例如下:
c := sync.NewCond(&sync.Mutex{})
var ready int
for i := 0; i < 10; i++ {
go func(i int) {
time.Sleep(time.Duration(rand.Int63n(10)) * time.Second)
// 加锁更改等待条件
c.L.Lock()
ready++
c.L.Unlock()
log.Printf("运动员#%d 已准备就绪\n", i)
// 广播唤醒所有的等待者
c.Signal() // 随机唤醒一个,如果要全部唤醒,则使用 c.Broadcast()
}(i)
}
go func() {
c.L.Lock()
fmt.Println("进入锁定线程1。。。")
for ready != 10 {
c.Wait()
log.Println("裁判员1被唤醒一次")
}
c.L.Unlock()
}()
c.L.Lock()
fmt.Println("进入锁定线程2。。。")
for ready != 10 {
c.Wait()
log.Println("裁判员2被唤醒一次")
}
c.L.Unlock()
//所有的运动员是否就绪
log.Println("所有运动员都准备就绪。比赛开始,3,2,1, ......")
注意点:
Signal()
和Boardcast()
都不一定在Lock
语句内使用Signal()
随机触发其中一个Wait()
的线程(通常是等待时间最长的),而Boardcast()
则会让全部线程唤醒Wait()
必须在锁控制语句中,否则会出现2次unlock()错误- 如果没有任何
Wait()
的goroutine,则Signal()
和Boardcast()
都没有任何作用 - 条件控制提现在包裹
Wait()
的for循环,如果把这个循环注释,虽然不会报错,但是条件控制就失去意义了 - 内部实质上就是利用锁实现了一个等待队列
WaitGroup
WaitGroup
作为并发控制原语,侧重于协调等待,即等待一组goroutine全部完成
使用示例:
package main
import (
"fmt"
"sync"
"time"
)
func main() {
var wg sync.WaitGroup
// 启动多个 goroutine
for i := 0; i < 5; i++ {
wg.Add(1) // 为每个 goroutine 增加计数
go func(i int){
defer wg.Done() // 在退出 goroutine 前递减计数,也是`Add(-1)`的快捷调用
time.Sleep(2 * time.Second) // 模拟耗时操作
fmt.Printf("Goroutine %d finished\n", i)
}(i)
}
fmt.Println("Waiting for goroutines...")
wg.Wait() // 等待所有 goroutine 调用 Done
fmt.Println("All goroutines completed")
}
需要注意的点:
Add()
的数目和Done()
/Add(-x)
的数据必须保持一致,如果最终的计数器小于0则会触发panic- wg对象不能被拷贝,应当使用指针指向,否则会出现未知错误
Wait()
可以被多个goroutine持有,当计数器为0时候,所有等待的goroutine都会被唤醒- 底层实现:2个计数器(等待/唤醒)都是int32,组合成一个int64,使用atomic进行数值加减,最终实现一个semaphore内部信号量机制
sync.Once
Once
作为并发控制原语,侧重于单次执行,即确保函数在多个goroutine,只会执行一次
使用示例:
var once sync.Once
func main() {
for i:= 0; i < 5; i++ {
go func(i int){
once.Do(func(){
fmt.Println("Only once", i)
})
}(i)
}
// 等待所有goroutines完成并查看影响
time.Sleep(time.Second)
}
注意点:
- 内部实现是一个bool标记和互斥锁
- 通常是用于只执行一次的初始化代码
- 可以一定程度上实现幂等逻辑,也可用来实现单例模式
- 其底层是双检查的机制,即未拿到执行权的,会等待执行权的执行完成
- 不能嵌套
once.Do
,会死锁
Atomic Operation
即原子操作,很基础的并发模型,通常作为其他并发模型实现的一个方式,现代计算机架构中,通常是机器指令层次直接支持的,比如x86的cmpxchg
,go语言中则由sync/atomic
实现
例如:
vat counter int32
func increment() {
atomic.AddInt32(&counter, 1) // 原子地将 counter 的值加 1
}
多进程调用increment()
时,则像单线程执行一样,每一次都是独立的
注意点:
- 其通常用于非集群环境下乐观锁实现
- 和锁实现的区别:
- 原子操作基于cpu支持,通常执行更快,适合简单的场景
- 锁设计到更多复杂的机制,如锁定、阻塞、唤醒等,通常适用于较复杂的操作和数据结构,锁可能会导致一些问题如死锁,优先级反转,饥饿或过度的线程调度延迟等
CAS
即比较并交换(Compare And Swap),一种用于同步的原子指令,通常用于多线程编程中的锁和并发数据结构的实现。CAS 是实现无锁编程(lock-free programming)原语的基础
伪代码实现为:
function CAS(V, A, B):
if *V == A:
*V = B
return true
else:
return false
注意点:
- 执行较快时会导致ABA问题
- 通常被用来实现自旋锁和其他同步工具
sync.Pool
一种高频临时对象的复用机制,类似线程池实现了一个“临时对象池”,示例如下:
var pool = sync.Pool {
New: func() interface{} {
return new(bytes.Buffer)
},
}
// 假设处理一些工作,需要临时的 Buffer 对象
func doWork() {
buf := pool.Get().(*byte.Buffer) // 从池中获取一个 Buffer
// 使用 buf 进行工作...
buf.Reset() // 清楚 Buffer,以便复用
pool.Put(buf) // g工作w完成后把 Buffer 放回 pool 中
}
优点:
- 减少GC压力
- 保持实例,减少内存分配次数,提高性能
- 减少内存泄漏风险
- 不适用以下情况:
- 创建和维护对象代价很高的场景。
- 对象的大小非常大,可能导致池自身维护的成本超过收益。
- 池内对象有可能导致显著的内存泄露。