// Mutex fairness. // // Mutex can be in 2 modes of operations: normal and starvation. // In normal mode waiters are queued in FIFO order, but a woken up waiter // does not own the mutex and competes with new arriving goroutines over // the ownership. New arriving goroutines have an advantage -- they are // already running on CPU and there can be lots of them, so a woken up // waiter has good chances of losing. In such case it is queued at front // of the wait queue. If a waiter fails to acquire the mutex for more than 1ms, // it switches mutex to the starvation mode. // // In starvation mode ownership of the mutex is directly handed off from // the unlocking goroutine to the waiter at the front of the queue. // New arriving goroutines don't try to acquire the mutex even if it appears // to be unlocked, and don't try to spin. Instead they queue themselves at // the tail of the wait queue. // // If a waiter receives ownership of the mutex and sees that either // (1) it is the last waiter in the queue, or (2) it waited for less than 1 ms, // it switches mutex back to normal operation mode. // // Normal mode has considerably better performance as a goroutine can acquire // a mutex several times in a row even if there are blocked waiters. // Starvation mode is important to prevent pathological cases of tail latency.
// A Mutex is a mutual exclusion lock. // The zero value for a Mutex is an unlocked mutex. // // A Mutex must not be copied after first use. type Mutex struct { state int32 sema uint32 }
// Lock locks m. // If the lock is already in use, the calling goroutine // blocks until the mutex is available. func(m *Mutex) Lock() { // Fast path: grab unlocked mutex. if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { if race.Enabled { race.Acquire(unsafe.Pointer(m)) } return } ... }
// Lock locks m. // If the lock is already in use, the calling goroutine // blocks until the mutex is available. func(m *Mutex) Lock() { // Fast path: grab unlocked mutex. if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { if race.Enabled { race.Acquire(unsafe.Pointer(m)) } return }
// 用来存当前goroutine等待的时间 var waitStartTime int64 // 用来存当前goroutine是否饥饿 starving := false // 用来存当前goroutine是否已唤醒 awoke := false // 用来存当前goroutine的循环次数(想一想一个goroutine如果循环了2147483648次咋办……) iter := 0 // 复制一下当前锁的状态 old := m.state // 自旋 for { // 如果是饥饿情况之下,就不要自旋了,因为锁会直接交给队列头部的goroutine // 如果锁是被获取状态,并且满足自旋条件(canSpin见后文分析),那么就自旋等锁 // 伪代码:if isLocked() and isNotStarving() and canSpin() if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) { // 将自己的状态以及锁的状态设置为唤醒,这样当Unlock的时候就不会去唤醒其它被阻塞的goroutine了 if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { awoke = true } // 进行自旋(分析见后文) runtime_doSpin() iter++ // 更新锁的状态(有可能在自旋的这段时间之内锁的状态已经被其它goroutine改变) old = m.state continue } // 当走到这一步的时候,可能会有以下的情况: // 1. 锁被获取+饥饿 // 2. 锁被获取+正常 // 3. 锁空闲+饥饿 // 4. 锁空闲+正常 // goroutine的状态可能是唤醒以及非唤醒 // 复制一份当前的状态,目的是根据当前状态设置出期望的状态,存在new里面, // 并且通过CAS来比较以及更新锁的状态 // old用来存锁的当前状态 new := old
// 如果说锁不是饥饿状态,就把期望状态设置为被获取(获取锁) // 也就是说,如果是饥饿状态,就不要把期望状态设置为被获取 // 新到的goroutine乖乖排队去 // 伪代码:if isNotStarving() if old&mutexStarving == 0 { // 伪代码:newState = locked new |= mutexLocked } // 如果锁是被获取状态,或者饥饿状态 // 就把期望状态中的等待队列的等待者数量+1(实际上是new + 8) // (会不会可能有三亿个goroutine等待拿锁……) if old&(mutexLocked|mutexStarving) != 0 { new += 1 << mutexWaiterShift } // 如果说当前的goroutine是饥饿状态,并且锁被其它goroutine获取 // 那么将期望的锁的状态设置为饥饿状态 // 如果锁是释放状态,那么就不用切换了 // Unlock期望一个饥饿的锁会有一些等待拿锁的goroutine,而不只是一个 // 这种情况下不会成立 if starving && old&mutexLocked != 0 { // 期望状态设置为饥饿状态 new |= mutexStarving } // 如果说当前goroutine是被唤醒状态,我们需要reset这个状态 // 因为goroutine要么是拿到锁了,要么是进入sleep了 if awoke { // 如果说期望状态不是woken状态,那么肯定出问题了 // 这里看不懂没关系,wake的逻辑在下面 ifnew&mutexWoken == 0 { throw("sync: inconsistent mutex state") } // 这句就是把new设置为非唤醒状态 // &^的意思是and not new &^= mutexWoken } // 通过CAS来尝试设置锁的状态 // 这里可能是设置锁,也有可能是只设置为饥饿状态和等待数量 if atomic.CompareAndSwapInt32(&m.state, old, new) { // 如果说old状态不是饥饿状态也不是被获取状态 // 那么代表当前goroutine已经通过CAS成功获取了锁 // (能进入这个代码块表示状态已改变,也就是说状态是从空闲到被获取) if old&(mutexLocked|mutexStarving) == 0 { break// locked the mutex with CAS } // 如果之前已经等待过了,那么就要放到队列头 queueLifo := waitStartTime != 0 // 如果说之前没有等待过,就初始化设置现在的等待时间 if waitStartTime == 0 { waitStartTime = runtime_nanotime() } // 既然获取锁失败了,就使用sleep原语来阻塞当前goroutine // 通过信号量来排队获取锁 // 如果是新来的goroutine,就放到队列尾部 // 如果是被唤醒的等待锁的goroutine,就放到队列头部 runtime_SemacquireMutex(&m.sema, queueLifo) // 这里sleep完了,被唤醒 // 如果当前goroutine已经是饥饿状态了 // 或者当前goroutine已经等待了1ms(在上面定义常量)以上 // 就把当前goroutine的状态设置为饥饿 starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs // 再次获取一下锁现在的状态 old = m.state // 如果说锁现在是饥饿状态,就代表现在锁是被释放的状态,当前goroutine是被信号量所唤醒的 // 也就是说,锁被直接交给了当前goroutine if old&mutexStarving != 0 { // 如果说当前锁的状态是被唤醒状态或者被获取状态,或者说等待的队列为空 // 那么是不可能的,肯定是出问题了,因为当前状态肯定应该有等待的队列,锁也一定是被释放状态且未唤醒 if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 { throw("sync: inconsistent mutex state") } // 当前的goroutine获得了锁,那么就把等待队列-1 delta := int32(mutexLocked - 1<<mutexWaiterShift) // 如果当前goroutine非饥饿状态,或者说当前goroutine是队列中最后一个goroutine // 那么就退出饥饿模式,把状态设置为正常 if !starving || old>>mutexWaiterShift == 1 { // Exit starvation mode. // Critical to do it here and consider wait time. // Starvation mode is so inefficient, that two goroutines // can go lock-step infinitely once they switch mutex // to starvation mode. delta -= mutexStarving } // 原子性地加上改动的状态 atomic.AddInt32(&m.state, delta) break } // 如果锁不是饥饿模式,就把当前的goroutine设为被唤醒 // 并且重置iter(重置spin) awoke = true iter = 0 } else { // 如果CAS不成功,也就是说没能成功获得锁,锁被别的goroutine获得了或者锁一直没被释放 // 那么就更新状态,重新开始循环尝试拿锁 old = m.state } }
if race.Enabled { race.Acquire(unsafe.Pointer(m)) } }