死锁产生的四个条件
2025/12/21大约 12 分钟
死锁产生的四个条件
基本定义
死锁(Deadlock)是指两个或多个进程/线程在执行过程中,因争夺资源而造成的一种互相等待的现象。若无外力干涉,它们都将无法继续执行下去。
死锁产生的四个必要条件
死锁的发生必须同时满足以下四个条件,缺一不可:
1. 互斥条件(Mutual Exclusion)
- 定义:资源不能被多个进程同时使用,一次只能由一个进程占用
- 说明:某个资源在一段时间内只能由一个进程持有,其他进程必须等待
- 例子:打印机、临界区资源、数据库连接
- 特点:这是资源本身的特性,通常无法破坏
2. 持有并等待条件(Hold and Wait)
- 定义:进程已经持有至少一个资源,但又申请新的资源,而该资源已被其他进程占有
- 说明:进程在等待获取新资源时,不释放已持有的资源
- 例子:进程A持有资源1,又申请资源2;进程B持有资源2,又申请资源1
- 特点:进程保持贪婪,既不放手又想要更多
3. 不可剥夺条件(No Preemption)
- 定义:进程已获得的资源在未使用完之前,不能被强行剥夺,只能由进程自己释放
- 说明:资源只能由持有者主动释放,外部无法强制回收
- 例子:某进程持有的锁不能被其他进程强制解锁
- 特点:资源的归还完全由持有者决定
4. 循环等待条件(Circular Wait)
- 定义:存在一个进程资源的循环等待链,每个进程都在等待下一个进程所持有的资源
- 说明:形成一个环形等待序列:P1等P2,P2等P3,...,Pn等P1
- 例子:P1持有R1等待R2,P2持有R2等待R3,P3持有R3等待R1
- 特点:形成闭环,谁也无法继续执行
形象化理解
经典的哲学家就餐问题:
5个哲学家围坐圆桌,每两人之间有一根筷子(共5根)
每人需要拿起左右两根筷子才能进餐
死锁场景:
┌───────────────────┐
│ 哲学家1(拿起左筷)│ → 等待右筷(被哲学家2拿着)
│ 哲学家2(拿起左筷)│ → 等待右筷(被哲学家3拿着)
│ 哲学家3(拿起左筷)│ → 等待右筷(被哲学家4拿着)
│ 哲学家4(拿起左筷)│ → 等待右筷(被哲学家5拿着)
│ 哲学家5(拿起左筷)│ → 等待右筷(被哲学家1拿着)
└───────────────────┘
形成循环等待,所有人都无法进餐!死锁的预防策略
破坏四个必要条件中的任意一个,就可以预防死锁:
1. 破坏互斥条件
- 方法:尽量让资源可共享
- 适用:适用于可共享资源(如只读文件)
- 局限:很多资源本质上无法共享(如打印机、写操作)
- 结论:通常不可行
2. 破坏持有并等待条件
- 方法1:一次性申请所有需要的资源
- 方法2:申请新资源前,释放已持有的所有资源
- 优点:实现简单
- 缺点:
- 资源利用率低(资源可能长时间被占用但未使用)
- 可能导致饥饿(无法同时获得所有资源的进程永远无法执行)
3. 破坏不可剥夺条件
- 方法:允许抢占资源
- 进程申请新资源失败时,释放已持有的资源
- 高优先级进程可以抢占低优先级进程的资源
- 优点:提高资源利用率
- 缺点:
- 实现复杂
- 可能导致数据不一致
- 某些资源无法抢占(如打印机已打印一半)
4. 破坏循环等待条件(最常用)
- 方法:对所有资源类型进行线性排序,规定进程必须按序申请资源
- 实现:
- 给每个资源编号:R1, R2, R3, ...
- 进程只能按递增顺序申请资源
- 如果已持有Ri,只能申请Rj(j > i)
- 优点:
- 破坏循环等待,彻底避免死锁
- 实现相对简单
- 缺点:
- 限制了程序的灵活性
- 资源编号的设计需要谨慎
Golang代码示例
package main
import (
"fmt"
"sync"
"time"
)
// ============ 死锁示例 ============
// 示例1:简单死锁 - 两个goroutine互相等待
func simpleDeadlock() {
var mu1, mu2 sync.Mutex
// Goroutine 1
go func() {
mu1.Lock()
fmt.Println("Goroutine 1: 获得锁1")
time.Sleep(100 * time.Millisecond) // 模拟处理时间
fmt.Println("Goroutine 1: 等待锁2...")
mu2.Lock() // 等待锁2
fmt.Println("Goroutine 1: 获得锁2")
mu2.Unlock()
mu1.Unlock()
}()
// Goroutine 2
go func() {
mu2.Lock()
fmt.Println("Goroutine 2: 获得锁2")
time.Sleep(100 * time.Millisecond) // 模拟处理时间
fmt.Println("Goroutine 2: 等待锁1...")
mu1.Lock() // 等待锁1
fmt.Println("Goroutine 2: 获得锁1")
mu1.Unlock()
mu2.Unlock()
}()
time.Sleep(3 * time.Second)
fmt.Println("发生死锁!两个goroutine都在等待")
}
// 示例2:哲学家就餐问题(死锁版本)
type Chopstick struct {
sync.Mutex
}
func philosopherDeadlock(id int, left, right *Chopstick, wg *sync.WaitGroup) {
defer wg.Done()
for i := 0; i < 3; i++ {
// 拿起左边的筷子
left.Lock()
fmt.Printf("哲学家 %d: 拿起左筷\n", id)
// 模拟思考
time.Sleep(100 * time.Millisecond)
// 尝试拿起右边的筷子(可能死锁)
fmt.Printf("哲学家 %d: 等待右筷...\n", id)
right.Lock()
fmt.Printf("哲学家 %d: 拿起右筷,开始进餐\n", id)
// 进餐
time.Sleep(100 * time.Millisecond)
// 放下筷子
right.Unlock()
left.Unlock()
fmt.Printf("哲学家 %d: 放下筷子\n", id)
}
}
func diningPhilosophersDeadlock() {
n := 5
chopsticks := make([]*Chopstick, n)
for i := 0; i < n; i++ {
chopsticks[i] = &Chopstick{}
}
var wg sync.WaitGroup
for i := 0; i < n; i++ {
wg.Add(1)
left := chopsticks[i]
right := chopsticks[(i+1)%n]
go philosopherDeadlock(i, left, right, &wg)
}
wg.Wait()
}
// ============ 避免死锁的方案 ============
// 方案1:破坏持有并等待 - 一次性获取所有资源
func avoidDeadlockAllAtOnce() {
var mu1, mu2 sync.Mutex
globalLock := &sync.Mutex{} // 全局锁保护资源申请
// Goroutine 1
go func() {
globalLock.Lock()
mu1.Lock()
mu2.Lock()
globalLock.Unlock()
fmt.Println("Goroutine 1: 同时获得锁1和锁2")
time.Sleep(100 * time.Millisecond)
mu2.Unlock()
mu1.Unlock()
fmt.Println("Goroutine 1: 释放所有锁")
}()
// Goroutine 2
go func() {
time.Sleep(50 * time.Millisecond)
globalLock.Lock()
mu1.Lock()
mu2.Lock()
globalLock.Unlock()
fmt.Println("Goroutine 2: 同时获得锁1和锁2")
time.Sleep(100 * time.Millisecond)
mu2.Unlock()
mu1.Unlock()
fmt.Println("Goroutine 2: 释放所有锁")
}()
time.Sleep(1 * time.Second)
}
// 方案2:破坏循环等待 - 资源按序申请
func avoidDeadlockOrdered() {
var mu1, mu2 sync.Mutex
// 定义锁的顺序:始终先获取mu1,再获取mu2
// Goroutine 1
go func() {
mu1.Lock()
fmt.Println("Goroutine 1: 获得锁1")
time.Sleep(100 * time.Millisecond)
mu2.Lock()
fmt.Println("Goroutine 1: 获得锁2")
mu2.Unlock()
mu1.Unlock()
fmt.Println("Goroutine 1: 释放所有锁")
}()
// Goroutine 2 - 也按相同顺序获取
go func() {
time.Sleep(50 * time.Millisecond)
mu1.Lock() // 先获取mu1(不是mu2)
fmt.Println("Goroutine 2: 获得锁1")
mu2.Lock()
fmt.Println("Goroutine 2: 获得锁2")
mu2.Unlock()
mu1.Unlock()
fmt.Println("Goroutine 2: 释放所有锁")
}()
time.Sleep(1 * time.Second)
}
// 方案3:哲学家就餐问题的解决方案
func philosopherSolution(id int, left, right *Chopstick, wg *sync.WaitGroup) {
defer wg.Done()
for i := 0; i < 3; i++ {
// 奇数号哲学家先拿左筷,偶数号先拿右筷(破坏循环等待)
var first, second *Chopstick
if id%2 == 0 {
first, second = left, right
} else {
first, second = right, left
}
first.Lock()
fmt.Printf("哲学家 %d: 拿起第一根筷子\n", id)
second.Lock()
fmt.Printf("哲学家 %d: 拿起第二根筷子,开始进餐\n", id)
// 进餐
time.Sleep(100 * time.Millisecond)
// 放下筷子
second.Unlock()
first.Unlock()
fmt.Printf("哲学家 %d: 进餐完毕,放下筷子\n", id)
}
}
func diningPhilosophersSolution() {
n := 5
chopsticks := make([]*Chopstick, n)
for i := 0; i < n; i++ {
chopsticks[i] = &Chopstick{}
}
var wg sync.WaitGroup
for i := 0; i < n; i++ {
wg.Add(1)
left := chopsticks[i]
right := chopsticks[(i+1)%n]
go philosopherSolution(i, left, right, &wg)
}
wg.Wait()
fmt.Println("所有哲学家都进餐完毕,无死锁")
}
// 方案4:使用超时机制避免永久阻塞
func avoidDeadlockWithTimeout() {
var mu1, mu2 sync.Mutex
tryLockWithTimeout := func(mu *sync.Mutex, timeout time.Duration) bool {
ch := make(chan struct{})
go func() {
mu.Lock()
close(ch)
}()
select {
case <-ch:
return true
case <-time.After(timeout):
return false
}
}
// Goroutine 1
go func() {
mu1.Lock()
fmt.Println("Goroutine 1: 获得锁1")
if tryLockWithTimeout(&mu2, 200*time.Millisecond) {
fmt.Println("Goroutine 1: 获得锁2")
mu2.Unlock()
} else {
fmt.Println("Goroutine 1: 获取锁2超时,释放锁1")
}
mu1.Unlock()
}()
// Goroutine 2
go func() {
time.Sleep(50 * time.Millisecond)
mu2.Lock()
fmt.Println("Goroutine 2: 获得锁2")
if tryLockWithTimeout(&mu1, 200*time.Millisecond) {
fmt.Println("Goroutine 2: 获得锁1")
mu1.Unlock()
} else {
fmt.Println("Goroutine 2: 获取锁1超时,释放锁2")
}
mu2.Unlock()
}()
time.Sleep(1 * time.Second)
}
// 方案5:使用TryLock(Go 1.18+)
func avoidDeadlockWithTryLock() {
var mu1, mu2 sync.Mutex
// Goroutine 1
go func() {
mu1.Lock()
fmt.Println("Goroutine 1: 获得锁1")
time.Sleep(100 * time.Millisecond)
// 尝试获取锁2,失败则放弃
// 注意:标准库sync.Mutex没有TryLock,这里是示意
// 实际可使用github.com/go-locks等第三方库
fmt.Println("Goroutine 1: 尝试获取锁2...")
// 模拟TryLock逻辑
acquired := false
done := make(chan struct{})
go func() {
mu2.Lock()
acquired = true
close(done)
}()
select {
case <-done:
if acquired {
fmt.Println("Goroutine 1: 成功获得锁2")
mu2.Unlock()
}
case <-time.After(50 * time.Millisecond):
fmt.Println("Goroutine 1: 获取锁2失败,放弃")
}
mu1.Unlock()
}()
time.Sleep(1 * time.Second)
}
func main() {
fmt.Println("=== 死锁示例 ===")
// simpleDeadlock() // 会死锁,注释掉
// diningPhilosophersDeadlock() // 可能死锁,注释掉
fmt.Println("\n=== 避免死锁方案1:一次性获取所有资源 ===")
avoidDeadlockAllAtOnce()
fmt.Println("\n=== 避免死锁方案2:资源按序申请 ===")
avoidDeadlockOrdered()
fmt.Println("\n=== 哲学家就餐问题解决方案 ===")
diningPhilosophersSolution()
fmt.Println("\n=== 避免死锁方案3:超时机制 ===")
avoidDeadlockWithTimeout()
}死锁检测与恢复
除了预防死锁,还可以采用检测和恢复的策略:
1. 死锁检测
- 定期检查资源分配图,查找循环等待
- 使用银行家算法检测安全状态
- 维护资源请求等待图
2. 死锁恢复
- 终止一个或多个进程
- 抢占资源并分配给其他进程
- 进程回滚到之前的检查点
实际应用场景
数据库死锁:
事务1:
锁住表A的记录1
等待表B的记录2
事务2:
锁住表B的记录2
等待表A的记录1
解决方案:
- 设置事务超时
- 按相同顺序访问资源
- 减少事务持有锁的时间分布式系统死锁:
服务A持有资源1,等待服务B的资源2
服务B持有资源2,等待服务A的资源1
解决方案:
- 分布式锁超时机制
- 全局顺序访问资源
- 使用分布式死锁检测算法相关面试题
Q1: 活锁和死锁有什么区别?
答案:
- 死锁:进程互相等待,都无法继续执行,状态不变
- 活锁:进程不断改变状态以响应对方,但都无法取得实质进展
- 例子:两人在走廊相遇,都想让对方先过,不断左右移动但都无法通过
- 区别:
- 死锁中进程处于阻塞状态,活锁中进程处于运行状态
- 死锁等待资源,活锁不断尝试解决冲突但失败
- 活锁可能自行解除(随机退避),死锁不会
Q2: 饥饿(Starvation)是什么?与死锁的区别?
答案:
- 饥饿:进程长期得不到所需资源,无法继续执行
- 原因:
- 优先级低,高优先级进程不断抢占资源
- 资源分配策略不公平
- 与死锁的区别:
- 饥饿是单个进程问题,死锁是多个进程互相等待
- 饥饿可能最终获得资源,死锁永远等待
- 饥饿可通过公平调度解决,死锁需要破坏必要条件
- 解决:使用公平队列、老化技术(提升等待时间长的进程优先级)
Q3: 如何检测死锁?
答案:
- 资源分配图法:
- 构建进程-资源分配图
- 检测图中是否存在环
- 有环则可能存在死锁
- 银行家算法:
- 检查系统是否处于安全状态
- 如果无安全序列,则可能发生死锁
- 等待图法(资源分配图的简化):
- 只保留进程节点
- 如果Pi等待Pj持有的资源,则Pi→Pj
- 检测图中是否有环
- 实现:定期运行死锁检测算法,检测周期需权衡开销和响应时间
Q4: 银行家算法是什么?
答案:
- 定义:一种避免死锁的资源分配算法
- 核心思想:在分配资源前,先判断分配后系统是否处于安全状态
- 安全状态:存在一个进程序列,使得每个进程都能获得所需资源并完成
- 步骤:
- 进程申请资源
- 系统试探性分配
- 检查是否存在安全序列
- 如果安全则真正分配,否则拒绝并等待
- 数据结构:
- Available:可用资源向量
- Max:最大需求矩阵
- Allocation:已分配矩阵
- Need:还需要资源矩阵(Need = Max - Allocation)
- 缺点:
- 需要预知进程的最大资源需求
- 进程数量固定
- 实际应用中较少使用
Q5: 两阶段锁协议是什么?
答案:
- 定义:数据库中避免死锁的协议
- 两个阶段:
- 扩展阶段(Growing Phase):只能获取锁,不能释放锁
- 收缩阶段(Shrinking Phase):只能释放锁,不能获取锁
- 作用:保证事务的可串行化
- 严格两阶段锁:所有排他锁在事务提交/回滚时才释放
- 限制:虽然保证串行化,但不能完全避免死锁,仍需要死锁检测/超时机制
Q6: 如何避免数据库死锁?
答案:
- 访问顺序一致:所有事务按相同顺序访问表和记录
- 减少事务范围:尽量缩短事务持有锁的时间
- 降低隔离级别:使用较低的隔离级别(如Read Committed)
- 使用索引:避免表锁,使用行锁
- 避免用户交互:事务中不要等待用户输入
- 设置超时:SET LOCK_TIMEOUT设置超时时间
- 死锁检测:数据库自动检测并回滚代价小的事务
- 批处理分批:大批量更新分批进行,减少锁持有时间
Q7: Golang中如何避免死锁?
答案:
- 按序加锁:始终按相同顺序获取多个锁
- 使用defer释放锁:确保锁一定被释放
- 使用channel通信:遵循"不要通过共享内存通信,通过通信共享内存"
- 使用select + default:非阻塞的channel操作
- 设置超时:使用context.WithTimeout
- 减少锁粒度:缩小临界区范围
- 避免嵌套锁:尽量不在持有锁时获取另一个锁
- 使用sync.Once:单次初始化避免竞争
- 工具检测:使用
go run -race检测竞态条件
关键点总结
死锁四个条件(必须同时满足):
- 互斥:资源独占
- 持有并等待:已有资源但又申请新资源
- 不可剥夺:资源不能被强制回收
- 循环等待:形成等待环路
预防死锁:破坏四个条件之一
- 最常用:破坏循环等待(资源按序申请)
- 其次:破坏持有并等待(一次性申请所有资源)
避免死锁:动态检测是否安全(银行家算法)
检测和恢复:定期检测,发现后终止进程或抢占资源
实践建议:
- 优先使用资源排序策略
- 设置合理的超时时间
- 减少锁的持有时间
- 使用高级并发原语(channel、actor模型)
- 工具辅助检测(race detector、死锁检测工具)