文章目录
- 1、简述问题
- 2、原子操作(atomic_ops )
- 指令解析:
- 3、锁函数说明
- 3.1、自旋锁
- API
- 例子
- 3.2、信号量(semaphore)
- API
- 例子
- 3.3、互斥量/锁
- API
- 例子
- 3.4、信号量和互斥锁的区别
- 4、锁的内核实现
- 4.1、自旋锁(include\linux\spinlock.h)
- 4.1.1、自旋锁在UP系统(单CPU系统)中的实现
- 4.1.2、自旋锁在SMP系统(多CPU系统)中的实现
- 4.2、信号量semaphore
- 4.3、互斥锁mutex_lock
- 4.4、互斥锁和自旋锁的区别
1、简述问题
为了防止某一个设备同时被调用:
失败例子1:
static int valid = 1;/* 打开 */static ssize_t gpio_key_drv_open (struct inode *node, struct file *file){/* 第一次进来为1, 进入else, 执行else期间进不来 */if (!valid){return -EBUSY;}else{valid = 0;}return 0; //成功}static int gpio_key_drv_close (struct inode *node, struct file *file){valid = 1;return 0;}
此程序由于时间片轮转(比如 if (!valid)是恰好发生中断),无法保证多个程序不会同时进入。
失败例子2:
static int valid = 1;static ssize_t gpio_key_drv_open (struct inode *node, struct file *file){if (--valid){valid++;return -EBUSY;}return 0;}static int gpio_key_drv_close (struct inode *node, struct file *file){valid = 1;return 0;}
先减1再判断,这样可以更大概率地避免问题,但是还是不能确保万无一失。if (–valid)对数据的修改分为3步:读出来、修改、写进去。
失败的例子3:
static int valid = 1;static ssize_t gpio_key_drv_open (struct inode *node, struct file *file){unsigned long flags;raw_local_irq_save(flags); // 关中断if (--valid){valid++;raw_local_irq_restore(flags); // 恢复之前的状态return -EBUSY;}raw_local_irq_restore(flags); // 恢复之前的状态return 0;
}static int gpio_key_drv_close (struct inode *node, struct file *file){valid = 1;return 0;}
对于单CPU核的系统上述代码是没问题的;但是对于SMP系统,你只能关闭当前CPU核的中断,别的CPU核还可以运行程序,它们也可以来执行这个函数,同样导致问题。
2、原子操作(atomic_ops )
函数文件:arch\arm\include\asm\atomic.h
查看内核实现:
#define atomic_inc(v) atomic_add(1, v)
ATOMIC_OPS(add, +=, add)
#define ATOMIC_OPS(op, c_op, asm_op) \ATOMIC_OP(op, c_op, asm_op) \ATOMIC_OP_RETURN(op, c_op, asm_op) \ATOMIC_FETCH_OP(op, c_op, asm_op)
ATOMIC_OPS(add, +=, add)ATOMIC_OP(add, +=, add) \ATOMIC_OP_RETURN(add, +=, add) \ATOMIC_FETCH_OP(add, +=, add)#define ATOMIC_OP(add, +=, add) \/* 内联汇编 */static inline void atomic_add(int i, atomic_t *v) \{ \unsigned long tmp; \int result; \\prefetchw(&v->counter); \__asm__ __volatile__("@ atomic_" add "\n" \/* 读出操作数3 存入操作数0 */"1: ldrex %0, [%3]\n" \/* 操作数0 + 操作数4相加赋值给操作数0 */" " add " %0, %0, %4\n" \/* 将新的操作数0写入操作数3 */" strex %1, %0, [%3]\n" \" teq %1, #0\n" \" bne 1b" \/* result:操作数0 tmp 操作数1 v->counter 操作数2 &v->counter 操作数3 i操作数4*/: "=&r" (result), "=&r" (tmp), "+Qo" (v->counter) \: "r" (&v->counter), "Ir" (i) \: "cc"); \}
指令解析:
"1: ldrex %0, [%3]\n"
加载排他指令 高明之处可以检测到执行过程中是否被打断,进而实现原子操作
ldrex 是 ARM 架构中的一个 加载排他(Load Exclusive) 指令,它通常用于原子操作和同步操作,特别是多处理器系统中进行内存操作时,确保在其他处理器没有修改数据的情况下进行操作。ldrex:该指令的含义是 Load Exclusive(加载排他)。它从指定的内存地址加载数据,并且标记该内存位置为“排他访问”。这意味着如果其他处理器或内核在执行 ldrex 后修改了该内存位置,那么执行与该 ldrex 配对的 strex(Store Exclusive)指令时会失败。ldrex 指令常常与 strex 配对使用,用于实现原子读-修改-写操作。%0:这是一个寄存器占位符,表示 ldrex 指令的目的寄存器。指令会把指定内存位置的数据加载到这个寄存器中。在此指令执行后,寄存器 %0 会包含内存地址 [r3] 中的数据。[%3]:这是内存地址操作数,表示要加载的内存地址。%3 是一个寄存器,存储了目标内存地址,[%3] 表示从该内存地址加载数据。
" strex %1, %0, [%3]\n"
存储排他指令 高明之处可以检测到执行过程中是否被打断,进而实现原子操作
在 ARM 汇编中,strex 是一个用于 原子操作 的指令,通常用于实现并发控制,如锁或同步机制。它是 ARM 的 Load-Store Exclusive (LSE) 指令集的一部分。下面是对您提供的指令 strex %1, %0, [%3] 的解析: strex:该指令的含义是 Store Exclusive(存储排他)。它的作用是将一个值存储到内存中,但前提是自上次执行 ldrex(Load Exclusive)指令以来,该内存位置没有被其他处理器修改过。如果内存位置被修改,strex 会失败;如果没有修改,则成功将值存储到指定的内存地址。%1, %0:这些是操作数的占位符,在汇编中通常表示寄存器。根据惯例,%1 和 %0 表示指令中的两个寄存器(或值)。具体来说:%0:存储的值,它表示要写入内存的值。%1:状态标志寄存器,它将接收执行 strex 的结果。通常 strex 会设置这个寄存器为:0:表示存储成功(即内存未被其他处理器修改)。1:表示存储失败(即内存在执行 ldrex 后被修改过)。[%3]:这是内存地址操作数。%3 代表一个寄存器,存储着要操作的内存地址。[%3] 表示将值存储到该寄存器指定的地址。
3、锁函数说明
3.1、自旋锁
spinlock函数在内核文件include\linux\spinlock.h中声明,如下表:(这些操作通过原子变量实现)
函数名 | 作用 |
---|---|
spin_lock_init(_lock) | 初始化自旋锁为unlock状态 |
void spin_lock(spinlock_t *lock) | 获取自旋锁(加锁),返回后肯定获得了锁 |
int spin_trylock(spinlock_t *lock) | 尝试获得自旋锁,成功获得锁则返回1,否则返回0 |
void spin_unlock(spinlock_t *lock) | 释放自旋锁,或称解锁 |
int spin_is_locked(spinlock_t *lock) | 返回自旋锁的状态,已加锁返回1,否则返回0 |
注意:加锁解锁之间的时间应该尽可能的少。
多CPU加自旋锁防止多CPU抢占以及多进程抢占。
单CPU加自旋锁防止多进程抢占。
spin_lock和spin_unlock还可以加上各种后缀,这表示在加锁或解锁的同时,还会做额外的事情:
后缀 | 描述 |
---|---|
_bh() | 加锁时禁止下半部(软中断),解锁时使能下半部(软中断) |
_irq() | 加锁时禁止中断,解锁时使能中断(对于全局中断无效) |
_irqsave/restore() | 加锁时禁止并中断并记录状态,解锁时恢复中断为所记录的状态 |
spin_lock_irq
和 spin_lock_irqsave
是 Linux 内核中两种用于保护临界区的自旋锁操作。它们的主要区别在于是否保存和恢复中断状态,具体如下:
1、spin_lock_irq特点:- 禁用本地 CPU 的中断 (`local_irq_disable`)。- 不保存之前的中断状态。- 释放锁时,通过配套的 `spin_unlock_irq` 重新启用中断。优点:- 适合明确知道当前中断是启用状态的场景。- 实现简单,无需保存或恢复中断状态。缺点:- 如果在调用 `spin_lock_irq` 时中断已经被禁用,释放锁后中断会被强制重新启用,可能破坏系统逻辑。2. spin_lock_irqsave特点:- 禁用本地 CPU 的中断,并保存当前中断状态。- 释放锁时,通过配套的 `spin_unlock_irqrestore` 恢复之前的中断状态。实现原理- 使用变量存储之前的中断状态,例如:unsigned long flags;local_irq_save(flags); // 保存中断状态并禁用中断优点:- 更加灵活,适用于中断状态未知的场景。- 不破坏之前的中断状态,适合更复杂的内核模块设计。缺点:- 需要额外的存储空间和时间开销,用于保存和恢复中断状态。对应的解锁函数:spin_unlock_irqrestore(spinlock_t *lock, unsigned long flags);释放自旋锁的同时,将中断状态恢复到 flags 保存的值。自旋锁在上锁的时候,上锁的时间要尽可能的短。在上锁期间不能有延时,耗时,休眠操作,还不能有copy_from_user/copy_to_user(可以被打断)的操作。(鲁棒性以及避免资源耗费过多)
API
自旋锁的API:1.定义自旋锁spinlock_t lock;2.初始化自旋锁spin_lock_init(&lock);3.上锁spin_lock(&lock)4.解锁spin_unlock(&lock)
例子
//2.初始化自旋锁
spin_lock_init(&lock);
int myled_open(struct inode *inode, struct file *file)
{//3.上锁spin_lock(&lock);if(flags != 0){spin_unlock(&lock);return -EBUSY;}flags = 1;spin_unlock(&lock);printk("%s:%d\n",__func__,__LINE__);return 0;
}int myled_close(struct inode *inode, struct file *file)
{printk("%s:%d\n",__func__,__LINE__);spin_lock(&lock);flags=0;//4.解锁spin_unlock(&lock);return 0;
}死锁:int myled_open(struct inode *inode, struct file *file){//3.上锁spin_lock(&lock);spin_lock(&lock);if(flags != 0){spin_unlock(&lock);return -EBUSY;}flags = 1;spin_unlock(&lock);printk("%s:%d\n",__func__,__LINE__);return 0;}
第二次调用 spin_lock(&lock); 时,会尝试再次获取已经持有的锁,而这是自旋锁 不支持递归加锁 的行为,可能会导致 死锁
自旋锁的基本原则自旋锁不支持递归加锁:如果同一个线程连续调用 spin_lock,会导致自己阻塞,最终进入死锁状态。加锁与解锁必须匹配:每次调用 spin_lock,都必须有对应的 spin_unlock,否则锁将永远保持被占用状态,其他线程无法获取锁。自旋锁适用于临界区代码很短的情况:如果锁的持有时间较长,应考虑使用互斥锁(mutex)或其他阻塞机制。
3.2、信号量(semaphore)
semaphore函数在内核文件include\linux\semaphore.h中声明,如下表:
函数名 | 作用 |
---|---|
DEFINE_SEMAPHORE(name) | 定义一个struct semaphore name结构体,count值设置为1 |
void sema_init(struct semaphore *sem, int val) | 初始化semaphore |
void down(struct semaphore *sem) | 获得信号量,如果暂时无法获得就会休眠返回之后就表示肯定获得了信号量在休眠过程中无法被唤醒,即使有信号发给这个进程也不处理 |
int down_interruptible(struct semaphore *sem) | 获得信号量,如果暂时无法获得就会休眠,休眠过程有可能收到信号而被唤醒,要判断返回值:0:获得了信号量-EINTR:被信号打断 |
int down_killable(struct semaphore *sem) | 跟down_interruptible类似,down_interruptible可以被任意信号唤醒,但down_killable只能被“fatal signal”唤醒,返回值:0:获得了信号量-EINTR:被信号打断 |
int down_trylock(struct semaphore *sem) | 尝试获得信号量,不会休眠,返回值:0:获得了信号量1:没能获得信号量 |
int down_timeout(struct semaphore *sem, long jiffies) | 获得信号量,如果不成功,休眠一段时间返回值:0:获得了信号量-ETIME:这段时间内没能获取信号量,超时返回down_timeout休眠过程中,它不会被信号唤醒 |
void up(struct semaphore *sem) | 释放信号量,唤醒其他等待信号量的进程 |
API
信号量:当一个进程获取到信号量之后此时另外一个进程也想获取信号量,后一个进程就处在休眠状态。1)获取不到信号量的时候不消耗cpu资源2)不会产生死锁3)工作在进程上下文4)信号量保护的临界区可以很大,里面可以 有延时,耗时,甚至休眠的操作。在临界区中可以有copy_from_user/copy_to_user函数
信号量的API:struct semaphore sema;//定义信号量void sema_init(struct semaphore *sem, int val)//val:初始化为1的是才具备互斥的效果//val:初始化为0,表示的是同步机制//上锁,如果获取不到锁就休眠void down(struct semaphore *sem);//尝试获取锁,如果获取锁成功,返回0 ,否则返回1int down_trylock(struct semaphore *sem);//解锁void up(struct semaphore *sem);
例子
//2.初始化信号量
sema_init(&sem,1);//第二个参数为1为互斥
int myled_open(struct inode *inode, struct file *file)
{/* 若>0 上锁成功,否则BUSY */if(down_trylock(&sem)){return -EBUSY;}printk("%s:%d\n",__func__,__LINE__);return 0;
}
int myled_close(struct inode *inode, struct file *file)
{printk("%s:%d\n",__func__,__LINE__);up(&sem);return 0;
}
3.3、互斥量/锁
只能被同一进程上锁解锁,mutex函数在内核文件include\linux\mutex.h中声明,如下表:
函数名 | 作用 |
---|---|
mutex_init(mutex) | 初始化一个struct mutex指针 |
DEFINE_MUTEX(mutexname) | 初始化struct mutex mutexname |
int mutex_is_locked(struct mutex *lock) | 判断mutex的状态1:被锁了(locked)0:没有被锁 |
void mutex_lock(struct mutex *lock) | 获得mutex,如果暂时无法获得,休眠返回之时必定是已经获得了mutex |
int mutex_lock_interruptible(struct mutex *lock) | 获得mutex,如果暂时无法获得,休眠;休眠过程中可以被信号唤醒,返回值:0:成功获得了mutex-EINTR:被信号唤醒了 |
int mutex_lock_killable(struct mutex *lock) | 跟mutex_lock_interruptible类似,mutex_lock_interruptible可以被任意信号唤醒,但mutex_lock_killable只能被“fatal signal”唤醒,返回值:0:获得了mutex-EINTR:被信号打断 |
int mutex_trylock(struct mutex *lock) | 尝试获取mutex,如果无法获得,不会休眠,返回值:1:获得了mutex,0:没有获得注意,这个返回值含义跟一般的mutex函数相反, |
void mutex_unlock(struct mutex *lock) | 释放mutex,会唤醒其他等待同一个mutex的线程 |
int atomic_dec_and_mutex_lock(atomic_t *cnt, struct mutex *lock) | 让原子变量的值减1,如果减1后等于0,则获取mutex,返回值:1:原子变量等于0并且获得了mutex0:原子变量减1后并不等于0,没有获得mutex |
API
互斥体:当一个进程获取到互斥体之后此时另外一个进程也想获取互斥体,后一个进程就处在休眠状态。1)获取不到信号量的时候不消耗cpu资源2)不会产生死锁3)工作在进程上下文4)信号量保护的临界区可以很大,里面可以有延时,耗时,甚至休眠的操作。在临界区中可以有copy_from_user/copy_to_user函数5)在获取不到互斥体时,此时进程会稍微等一会儿进入休眠状态,正是这个原因,对于保护进程上下文临界区比较小的资源的时候,使用互斥体比信号量效率要高。
//定义互斥体
struct mutex mutex;
//初始化互斥体
mutex_init(&mutex);
//上锁,如果获取不到锁就休眠
void mutex_lock(struct mutex *lock);
//尝试获取锁,如果获取锁成功,返回1 ,否则返回0
int mutex_trylock(struct mutex *lock);
//解锁
void mutex_unlock(struct mutex *lock);
例子
int myled_open(struct inode *inode, struct file *file)
{//mutex_lock(&lock)if(!mutex_trylock(&lock)){return -EBUSY;}printk("%s:%d\n",__func__,__LINE__);return 0;
}int myled_close(struct inode *inode, struct file *file)
{printk("%s:%d\n",__func__,__LINE__);mutex_unlock(&lock);return 0;
}
3.4、信号量和互斥锁的区别
semaphore中可以指定count为任意值,比如有10个厕所,所以10个人都可以使用厕所。而mutex的值只能设置为1或0,只有一个厕所。
是不是把semaphore的值设置为1后,它就跟mutex一样了呢?不是的。
struct mutex {/* 1: unlocked, 0: locked, negative: locked, possible waiters */atomic_t count;spinlock_t wait_lock;struct list_head wait_list;/* 此处限制互斥锁只能限制在某一进程上下文,即在A进程上锁,那他只能在A线程解锁 */
#if defined(CONFIG_DEBUG_MUTEXES) || defined(CONFIG_MUTEX_SPIN_ON_OWNER)struct task_struct *owner;
#endif
#ifdef CONFIG_MUTEX_SPIN_ON_OWNERstruct optimistic_spin_queue osq; /* Spinner MCS lock */
#endif
#ifdef CONFIG_DEBUG_MUTEXESvoid *magic;
#endif
#ifdef CONFIG_DEBUG_LOCK_ALLOCstruct lockdep_map dep_map;
#endif
};
而semaphore并没有这些限制,它可以用来解决“读者-写者”问题:程序A在等待数据──想获得锁,程序B产生数据后释放锁,这会唤醒A来读取数据。semaphore的锁定与释放,并不限定为同一个进程。
区别列表:
semaphore | mutex | |
---|---|---|
几把锁 | 任意,可设置 | 1 |
谁能解锁 | 别的程序、中断等都可以 | 谁加锁,就得由谁解锁 |
多次解锁 | 可以 | 不可以,因为只有1把锁 |
循环加锁 | 可以 | 不可以,因为只有1把锁 |
任务在持有锁的期间可否退出 | 可以 | 不建议,容易导致死锁 |
硬件中断、软件中断上下文中使用 | 可以 | 不可以 |
4、锁的内核实现
4.1、自旋锁(include\linux\spinlock.h)
① 对于多CPU而言: A上锁, B打转
② 对于单CPU而言:自旋锁的“自旋”功能就去掉了:只剩下禁止抢占、禁止中断,用完锁再使能系统抢占(preempt_enable),这样别人就可以来抢资源了。
SMP:Symmetric Multi-Processors,对称多处理器;
UP:Uni-Processor,系统只有一个单核CPU。
自旋锁内核结构体:(include\linux\spinlock_types.h)
typedef struct spinlock {union {struct raw_spinlock rlock;};
} spinlock_t;typedef struct raw_spinlock {arch_spinlock_t raw_lock;
} raw_spinlock_t;
/* \arch\arm\include\asm\spinlock_types.h */
typedef struct {union {u32 slock;struct __raw_tickets {
#ifdef __ARMEB__u16 next;u16 owner;
#elseu16 owner;u16 next;
#endif} tickets;};
} arch_spinlock_t;
上述__raw_tickets结构体中有owner、next两个成员,这是在SMP系统中实现spinlock的关键。
4.1.1、自旋锁在UP系统(单CPU系统)中的实现
对于“自旋锁”,它的本意是:如果还没获得锁,我就原地打转等待。等待谁释放锁? CPU : THREAD/PROCESS
对于单CPU系统,没有“其他CPU”;如果内核不支持preempt (抢占),当前在内核态执行的线程也不可能被其他线程抢占,也就“没有其他进程/线程”。所以,对于不支持preempt的单CPU系统,spin_lock是空函数,不存在抢占就不需要做其他事情。
如果单CPU系统的内核支持preempt,即当前线程正在执行内核态函数时,它是有可能被别的线程抢占的。这时spin_lock的实现就是调用“preempt_disable()”:你想抢我,我干脆禁止你运行。
/* include/linux/Spinlock.h */
static __always_inline void spin_lock(spinlock_t *lock)
{raw_spin_lock(&lock->rlock);
}
#define raw_spin_lock(lock) _raw_spin_lock(lock)
#define _raw_spin_lock(lock) __LOCK(lock)
#define __LOCK(lock) \do { preempt_disable(); ___LOCK(lock); } while (0)
#define ___LOCK(lock) \do { __acquire(lock); (void)(lock); } while (0)
# define __acquire(x) (void)0
在UP系统中spin_lock()就退化为preempt_disable(),如果用的内核不支持preempt,那么spin_lock()什么事都不用做
#define __LOCK_IRQ(lock) \do { local_irq_disable(); __LOCK(lock); } while (0)
#define __LOCK(lock) \do { preempt_disable(); ___LOCK(lock); } while (0)
假设程序A要访问临界资源,可能会有中断也来访问临界资源,可能会有程序B也来访问临界资源,那么使用spin_lock_irq()来保护临界资源:先禁止中断防止中断来抢,再禁止preempt防止其他进程来抢。
对于spin_lock_bh(),在UP系统中就退化为禁止软件中断和preempt_disable()
#define __LOCK_BH(lock) \do { __local_bh_disable_ip(_THIS_IP_, SOFTIRQ_LOCK_OFFSET); ___LOCK(lock); } while (0)
#define SOFTIRQ_LOCK_OFFSET (SOFTIRQ_DISABLE_OFFSET + PREEMPT_LOCK_OFFSET)
禁止软件中断和preempt
对于spin_lock_irqsave,它跟spin_lock_irq类似,只不过它是先保存中断状态再禁止中断:
#define __LOCK_IRQSAVE(lock, flags) \do { local_irq_save(flags); __LOCK(lock); } while (0)
/* 保存中断状态 */
#define local_irq_save(flags) \do { \raw_local_irq_save(flags); \} while (0)
#define raw_local_irq_save(flags) ((flags) = 0)
/* 禁止中断 */
#define __LOCK(lock) \do { preempt_disable(); ___LOCK(lock); } while (0)
4.1.2、自旋锁在SMP系统(多CPU系统)中的实现
要让多CPU中只能有一个获得临界资源,使用原子变量就可以实现。但是还要保证公平,先到先得。比如有CPU0、CPU1、CPU2都调用spin_lock想获得临界资源,谁先申请谁先获得。
① 一开始取号机待取号码为0
② 顾客A从取号机得到号码0,电子叫号牌显示0,顾客A上座;
取号机显示下一个待取号码为1。
③ 顾客B从取号机得到号码1,电子叫号牌还显示为0,顾客B等待;
取号机显示下一个待取号码为2。
④ 顾客C从取号机得到号码2,电子叫号牌还显示为0,顾客C等待;
取号机显示下一个待取号码为3。
⑤ 顾客A吃完离座,电子叫号牌显示为1,顾客B的号码等于1,他上座;
⑥ 顾客B吃完离座,电子叫号牌显示为2,顾客C的号码等于2,他上座;
在这个过程中,即使顾客B、C同时到店,只要保证他们从取号机上得到的号码不同,他们就不会打架。
在ARMv6及以上的ARM架构中,支持SMP系统。
owner就相当于电子叫号牌,现在谁在吃饭。next就当于于取号机,下一个号码是什么。每一个CPU从取号机上取到的号码保存在spin_lock函数中的局部变量里。类似链表操作。
源码简要分析:
static __always_inline void spin_lock(spinlock_t *lock)
{raw_spin_lock(&lock->rlock);
}/* include/linux/spinlock.h */
#define raw_spin_lock(lock) _raw_spin_lock(lock)
/* include/linux/Spinlock_api_up.h */ 单核调用此宏定义
#define _raw_spin_lock(lock) __LOCK(lock)
/* kernel/locking/Spinlock.c */ 多核调用此函数
void __lockfunc _raw_spin_lock(raw_spinlock_t *lock)
{__raw_spin_lock(lock);
}
对于上面两个重名宏定义和函数:宏和函数的名字相同时,预处理器会首先处理宏定义。具体的行为取决于编译上下文。内核代码需要支持多种硬件架构(如 x86、ARM、RISC-V)和不同配置(如单核、多核)。在不同场景下,_raw_spin_lock 的实现可能有所不同。在单核系统(UP,Uniprocessor)中,自旋锁实际上是没有意义的,因为没有竞争条件。这时可以直接使用一个宏(__LOCK)实现锁操作,避免运行时开销。在多核系统(SMP,Symmetric Multiprocessing)中,自旋锁需要有更复杂的逻辑,例如使用原子操作。因此会调用真正的函数 _raw_spin_lock。通过这种机制,编译时可以根据内核配置,选择使用宏或函数。
那么:如何区分宏和函数的调用?看编译时候使能哪个。
static inline void __raw_spin_lock(raw_spinlock_t *lock)
{preempt_disable();spin_acquire(&lock->dep_map, 0, 0, _RET_IP_);LOCK_CONTENDED(lock, do_raw_spin_trylock, do_raw_spin_lock);
}
static inline void do_raw_spin_lock(raw_spinlock_t *lock) __acquires(lock)
{__acquire(lock);arch_spin_lock(&lock->raw_lock);
}
static inline void do_raw_spin_unlock(raw_spinlock_t *lock) __releases(lock)
{arch_spin_unlock(&lock->raw_lock);__release(lock);
}static inline void arch_spin_lock(arch_spinlock_t *lock)
{unsigned long tmp;u32 newval;arch_spinlock_t lockval;prefetchw(&lock->slock);__asm__ __volatile__(
"1: ldrex %0, [%3]\n"
" add %1, %0, %4\n"
" strex %2, %1, [%3]\n"
" teq %2, #0\n"
" bne 1b": "=&r" (lockval), "=&r" (newval), "=&r" (tmp): "r" (&lock->slock), "I" (1 << TICKET_SHIFT): "cc");while (lockval.tickets.next != lockval.tickets.owner) {wfe();lockval.tickets.owner = ACCESS_ONCE(lock->tickets.owner);}smp_mb();
}
4.2、信号量semaphore
信号量结构体:
/* Please don't access any members of this structure directly */
struct semaphore {raw_spinlock_t lock; // 信号量的使用需要借助 自旋锁unsigned int count; // 代表可以使用的信号资源的数量struct list_head wait_list; // 等待的信号线程
};
获取信号量:
/*** down - acquire the semaphore* @sem: the semaphore to be acquired** Acquires the semaphore. If no more tasks are allowed to acquire the* semaphore, calling this function will put the task to sleep until the* semaphore is released.** Use of this function is deprecated, please use down_interruptible() or* down_killable() instead.*/
/* 如果semaphore中的count大于0,那么down函数就可以获得信号量;否则就休眠。在读取、修改count时,要使用spinlock来实现互斥。
休眠时,要把当前进程放在semaphore的wait_list链表中,别的进程释放信号量时去wait_list中把进程取出、唤醒 */
void down(struct semaphore *sem)
{unsigned long flags;/* 使用自旋锁上锁访问 */raw_spin_lock_irqsave(&sem->lock, flags);/* 如果semaphore中的count大于0,那么down函数就可以获得信号量; count-- 对应sem_wait */if (likely(sem->count > 0))sem->count--;/* 否则就休眠。 休眠时,要把当前进程放在semaphore的wait_list链表中,别的进程释放信号量时去wait_list中把进程取出、唤醒 对应sem_post */else__down(sem);raw_spin_unlock_irqrestore(&sem->lock, flags);
}static noinline void __sched __down(struct semaphore *sem)
{__down_common(sem, TASK_UNINTERRUPTIBLE, MAX_SCHEDULE_TIMEOUT);
}/** Because this function is inlined, the 'state' parameter will be* constant, and thus optimised away by the compiler. Likewise the* 'timeout' parameter for the cases without timeouts.*/
static inline int __sched __down_common(struct semaphore *sem, long state,long timeout)
{struct task_struct *task = current;struct semaphore_waiter waiter;/* 把当前进程放入信号量的wait_list */list_add_tail(&waiter.list, &sem->wait_list);waiter.task = task;waiter.up = false;for (;;) {if (signal_pending_state(state, task))goto interrupted;if (unlikely(timeout <= 0))goto timed_out;/* 修改状态为非running */__set_task_state(task, state);/* 释放自旋锁,否则别的进程无法释放信号量 */raw_spin_unlock_irq(&sem->lock);/* 主动启动调度 */timeout = schedule_timeout(timeout);/* 被唤醒后启动自旋锁 */raw_spin_lock_irq(&sem->lock);/* 若获得信号量返回 */if (waiter.up)return 0;}timed_out:list_del(&waiter.list);return -ETIME;interrupted:list_del(&waiter.list);return -EINTR;
}
释放信号量:
/*** up - release the semaphore* @sem: the semaphore to release** Release the semaphore. Unlike mutexes, up() may be called from any* context and even by tasks which have never called down().*/
void up(struct semaphore *sem)
{unsigned long flags;/* 使用自旋锁上锁访问 */raw_spin_lock_irqsave(&sem->lock, flags);/* 如果没有其他进程在等待信号量,则调整count */if (likely(list_empty(&sem->wait_list)))sem->count++;/* 如果有其他进程在等待信号量,则count值无需调整,直接取出第1个等待信号量的进程,把信号量给它,共把它唤醒。 */else__up(sem);raw_spin_unlock_irqrestore(&sem->lock, flags);
}
static noinline void __sched __up(struct semaphore *sem)
{/* 从wait_list 中取出第一个waiter */struct semaphore_waiter *waiter = list_first_entry(&sem->wait_list,struct semaphore_waiter, list);/* 删除wait_list中的这个waiter(从链表取出节点) */list_del(&waiter->list);/* 设置标记,表明获取了此信号量 */waiter->up = true;/* 唤醒此信号量的进程 */wake_up_process(waiter->task);
}
4.3、互斥锁mutex_lock
互斥锁结构体
struct mutex {/* 1: unlocked, 0: locked, negative: locked, possible waiters */atomic_t count; /* count 要么等于0:表示上锁状态 要么等于1:表示解锁状态 */spinlock_t wait_lock; /* 自旋锁 */struct list_head wait_list; /* 等待mutex的进程 *//* 优化调试 */
#if defined(CONFIG_DEBUG_MUTEXES) || defined(CONFIG_MUTEX_SPIN_ON_OWNER)struct task_struct *owner;
#endif
#ifdef CONFIG_MUTEX_SPIN_ON_OWNERstruct optimistic_spin_queue osq; /* Spinner MCS lock */
#endif
#ifdef CONFIG_DEBUG_MUTEXESvoid *magic;
#endif
#ifdef CONFIG_DEBUG_LOCK_ALLOCstruct lockdep_map dep_map;
#endif
};
mutex的操作函数中有fastpath、slowpath两条路径(快速、慢速):如果fastpath成功,就不必使用slowpath。
/*** mutex_lock - acquire the mutex* @lock: the mutex to be acquired** Lock the mutex exclusively for this task. If the mutex is not* available right now, it will sleep until it can get it.** The mutex must later on be released by the same task that* acquired it. Recursive locking is not allowed. The task* may not exit without first unlocking the mutex. Also, kernel* memory where the mutex resides must not be freed with* the mutex still locked. The mutex must first be initialized* (or statically defined) before it can be locked. memset()-ing* the mutex to 0 is not allowed.** ( The CONFIG_DEBUG_MUTEXES .config option turns on debugging* checks that will enforce the restrictions and will also do* deadlock debugging. )** This function is similar to (but not equivalent to) down().*/
void __sched mutex_lock(struct mutex *lock)
{might_sleep();/** The locking fastpath is the 1->0 transition from* 'unlocked' into 'locked' state.*/__mutex_fastpath_lock(&lock->count, __mutex_lock_slowpath);mutex_set_owner(lock);
}#if __LINUX_ARM_ARCH__ < 6
#include <asm-generic/mutex-xchg.h>
#else
#include <asm-generic/mutex-dec.h>
#endif
#endif /* _ASM_MUTEX_H */__mutex_fastpath_lock(atomic_t *count, void (*fail_fn)(atomic_t *))
{/* 大部分情况下,mutex当前值都是1,所以通过fastpath函数可以非常快速地获得mutex。 1 - 1 = 0; 条件不成立,获得锁 */if (unlikely(atomic_dec_return_acquire(count) < 0))/* fail_fn对应的是__mutex_lock_slowpath,若count不为1进入__mutex_lock_slowpath模式,等待解锁 */fail_fn(count);
}如果mutex当前值是0或负数,则需要调用__mutex_lock_slowpath慢慢处理:可能会休眠等待。
__mutex_lock_slowpath(atomic_t *lock_count)
{struct mutex *lock = container_of(lock_count, struct mutex, count);__mutex_lock_common(lock, TASK_UNINTERRUPTIBLE, 0,NULL, _RET_IP_, NULL, 0);
}__mutex_lock_common(struct mutex *lock, long state, unsigned int subclass,struct lockdep_map *nest_lock, unsigned long ip,struct ww_acquire_ctx *ww_ctx, const bool use_ww_ctx)
{struct task_struct *task = current;struct mutex_waiter waiter;unsigned long flags;int ret;/* 传入的NULL */if (use_ww_ctx) {struct ww_mutex *ww = container_of(lock, struct ww_mutex, base);if (unlikely(ww_ctx == READ_ONCE(ww->ctx)))return -EALREADY;}/* 禁止抢占 */preempt_disable();mutex_acquire_nest(&lock->dep_map, subclass, 0, nest_lock, ip);/* 锁的一个优化 */if (mutex_optimistic_spin(lock, ww_ctx, use_ww_ctx)) {/* got the lock, yay! */preempt_enable();return 0;}/* 自旋锁上锁 */spin_lock_mutex(&lock->wait_lock, flags);/** Once more, try to acquire the lock. Only try-lock the mutex if* it is unlocked to reduce unnecessary xchg() operations.*//* count == 1 无需等待 跳出 */if (!mutex_is_locked(lock) &&(atomic_xchg_acquire(&lock->count, 0) == 1))goto skip_wait;debug_mutex_lock_common(lock, &waiter);debug_mutex_add_waiter(lock, &waiter, task);/* add waiting tasks to the end of the waitqueue (FIFO): *//* 当前进程放入等待列表 */list_add_tail(&waiter.list, &lock->wait_list);waiter.task = task;lock_contended(&lock->dep_map, ip);for (;;) {/** Lets try to take the lock again - this is needed even if* we get here for the first time (shortly after failing to* acquire the lock), to make sure that we get a wakeup once* it's unlocked. Later on, if we sleep, this is the* operation that gives us the lock. We xchg it to -1, so* that when we release the lock, we properly wake up the* other waiters. We only attempt the xchg if the count is* non-negative in order to avoid unnecessary xchg operations:*//* 目的:检查锁是否处于可用状态, 尝试将锁的状态交换为 -1(表示锁定状态), 如果交换之前 lock->count 的值是 1,表示锁已成功获取。即:只有count事先为1,才会break跳出, atomic_xchg_acquire返回值为count交换前的值,若count不为1则向下继续执行 */if (atomic_read(&lock->count) >= 0 &&(atomic_xchg_acquire(&lock->count, -1) == 1))break;/** got a signal? (This code gets eliminated in the* TASK_UNINTERRUPTIBLE case.)*//* 收到信号退出 */if (unlikely(signal_pending_state(state, task))) {ret = -EINTR;goto err;}if (use_ww_ctx && ww_ctx->acquired > 0) {ret = __ww_mutex_lock_check_stamp(lock, ww_ctx);if (ret)goto err;}__set_task_state(task, state);/* didn't get the lock, go to sleep: *//* 释放锁 */spin_unlock_mutex(&lock->wait_lock, flags);/* 发起调度 */schedule_preempt_disabled();/* 再次获得锁 */spin_lock_mutex(&lock->wait_lock, flags);}/* 设置当前进程为非RUNNING状态 */__set_task_state(task, TASK_RUNNING);/* 从mutex 的等待列表中移除此进程 */mutex_remove_waiter(lock, &waiter, task);/* set it to 0 if there are no waiters left: *//* 列表无人等待,将count设置为空 */if(likely(list_empty(&lock->wait_list)))atomic_set(&lock->count, 0);debug_mutex_free_waiter(&waiter);skip_wait:/* got the lock - cleanup and rejoice! */lock_acquired(&lock->dep_map, ip);mutex_set_owner(lock);if (use_ww_ctx) {struct ww_mutex *ww = container_of(lock, struct ww_mutex, base);ww_mutex_set_context_slowpath(ww, ww_ctx);}spin_unlock_mutex(&lock->wait_lock, flags);preempt_enable();return 0;err:mutex_remove_waiter(lock, &waiter, task);spin_unlock_mutex(&lock->wait_lock, flags);debug_mutex_free_waiter(&waiter);mutex_release(&lock->dep_map, 1, ip);preempt_enable();return ret;
}
4.4、互斥锁和自旋锁的区别
互斥锁和自旋锁是多线程同步的两种常用锁机制,它们的核心区别在于资源竞争时的线程行为以及使用场景的不同。以下是详细对比:
互斥锁(Mutex)
- 互斥锁是一种阻塞式锁。
- 当一个线程尝试获取已被占用的锁时,线程会进入 睡眠状态,直到锁被释放后被唤醒。
- 互斥锁适合 锁持有时间较长 的场景,避免 CPU 时间片的浪费。
自旋锁(Spinlock)
- 自旋锁是一种非阻塞式锁。
- 当一个线程尝试获取已被占用的锁时,线程会不断地 轮询(自旋) 检查锁是否释放,直到成功获取。
- 自旋锁适合 锁持有时间非常短 的场景,减少上下文切换的开销。