zl程序教程

您现在的位置是:首页 >  其他

当前栏目

Linux驱动之并发与竞争

2023-04-18 16:40:08 时间

并发与竞争的概念

Linux 系统是个多任务操作系统,会存在多个任务同时访问同一片内存区域,这些任务可
能会相互覆盖这段内存中的数据,造成内存数据混乱。
针对这个问题必须要做处理,严重的话可能会导致系统崩溃。
linux存在以下并发访问:
①、多线程并发访问,Linux 是多任务(线程)的系统,所以多线程访问是最基本的原因。
②、抢占式并发访问,从 2.6 版本内核开始,Linux 内核支持抢占,也就是说调度程序可以
在任意时刻抢占正在运行的线程,从而运行其他的线程。
③、中断程序并发访问,这个无需多说,学过 STM32 应该知道,硬件中断的权利可
是很大的。
④、SMP(多核)核间并发访问,现在 ARM 架构的多核 SOC 很常见,多核 CPU 存在核间并
发访问。

原子操作

原子操作就是指不能再进一步分割的操作
Linux 内核提供了一些原子操作 API 函数来完成此功能
这些API分为对整型数据操作, 以及位运算操作

原子整形操作 API 函数

函数描述
ATOMIC_INIT(int i)定义原子变量的时候对其初始化。
int atomic_read(atomic_t *v)读取 v 的值,并且返回。
void atomic_set(atomic_t *v, int i)向 v 写入 i 值。
void atomic_add(int i, atomic_t *v)给 v 加上 i 值。
void atomic_sub(int i, atomic_t *v)从 v 减去 i 值。
void atomic_inc(atomic_t *v)给 v 加 1,也就是自增。
void atomic_dec(atomic_t *v)从 v 减 1,也就是自减
int atomic_dec_return(atomic_t *v)从 v 减 1,并且返回 v 的值。
int atomic_inc_return(atomic_t *v)给 v 加 1,并且返回 v 的值。
int atomic_sub_and_test(int i, atomic_t *v)从 v 减 i,如果结果为 0 就返回真,否则返回假
int atomic_dec_and_test(atomic_t *v)从 v 减 1,如果结果为 0 就返回真,否则返回假
int atomic_inc_and_test(atomic_t *v)给 v 加 1,如果结果为 0 就返回真,否则返回假
int atomic_add_negative(int i, atomic_t *v)给 v 加 i,如果结果为负就返回真,否则返回假

原子位操作 API 函数

函数描述
void set_bit(int nr, void *p)将 p 地址的第 nr 位置 1。
void clear_bit(int nr,void *p)将 p 地址的第 nr 位清零。
void change_bit(int nr, void *p)将 p 地址的第 nr 位进行翻转。
int test_bit(int nr, void *p)获取 p 地址的第 nr 位的值。
int test_and_set_bit(int nr, void *p)将 p 地址的第 nr 位置 1,并且返回 nr 位原来的值。
int test_and_clear_bit(int nr, void *p)将 p 地址的第 nr 位清零,并且返回 nr 位原来的值。
int test_and_change_bit(int nr, void *p)将 p 地址的第 nr 位翻转,并且返回 nr 位原来的值。

自旋锁

自旋锁简介

当一个线程要访问某个共享资源的时候首先要先获取相应的锁,锁只能被一个线程持有,
只要此线程不释放持有的锁,那么其他的线程就不能获取此锁。对于自旋锁而言,如果自旋锁
正在被线程 A 持有,线程 B 想要获取自旋锁,那么线程 B 就会处于忙循环-旋转-等待状态,线
程 B 不会进入休眠状态或者说去做其他的处理,而是会一直傻傻的在那里“转圈圈”的等待锁可用。
从这里我们可以看到自旋锁的一个缺点:那就等待自旋锁的线程会一直处于自旋状态,这样会浪
费处理器时间,降低系统性能,所以自旋锁的持有时间不能太长。所以自旋锁适用于短时期的轻
量级加锁,如果遇到需要长时间持有锁的场景那就需要换其他的方法了

自旋锁结构体

typedef struct spinlock {
 	union {
 		struct raw_spinlock rlock;

 #ifdef CONFIG_DEBUG_LOCK_ALLOC
 # define LOCK_PADSIZE (offsetof(struct raw_spinlock, dep_map))
 		struct {
 			u8 __padding[LOCK_PADSIZE];
 			struct lockdep_map dep_map;
 			};
 #endif
 	};
 } spinlock_t;

Linux 内核使用结构体 spinlock_t 表示自旋锁

自旋锁 API 函数

函数描述
DEFINE_SPINLOCK(spinlock_t lock)定义并初始化一个自旋变量。
int spin_lock_init(spinlock_t *lock)初始化自旋锁。
void spin_lock(spinlock_t *lock)获取指定的自旋锁,也叫做加锁。
void spin_unlock(spinlock_t *lock)释放指定的自旋锁。
int spin_trylock(spinlock_t *lock)尝试获取指定的自旋锁,如果没有获取到就返回 0
int spin_is_locked(spinlock_t *lock)检查指定的自旋锁是否被获取,如果没有被获取就返回非 0,否则返回 0。

自旋锁API函数适用于SMP或支持抢占的单CPU下线程之间的并发访问

中断里面可以使用自旋锁,但是在中断里面使用自旋锁的时候,在获取锁之前一定要先禁止本地中断(也就是本 CPU 中断,对于多核 SOC来说会有多个 CPU 核),否则可能导致锁死现象的发生

SMP是指多个核心运行一个操作系统,该操作系统同等的管理多个内核,这种运行模式就是简单提高运行性能。目前支持该运行模式的操作系统有:Linux,Windows,Vxworks。
AMP的运行模式基本不会存在开销问题,尤其是在运行裸机程序时,甚至没有开销,这种模式比较适合实时性高的应用。但是两个核心之间的通信与资源共享需要有一套优秀的处理机制。
BMP运行模式与 SMP类似,同样也是一个OS管理所有的核心,但开发者可以指定将某个任务仅在某个指定内核上执行 。

函数描述
void spin_lock_irq(spinlock_t *lock)禁止本地中断,并获取自旋锁。
void spin_unlock_irq(spinlock_t *lock)激活本地中断,并释放自旋锁。
void spin_lock_irqsave(spinlock_t *lock, unsigned long flags)保存中断状态,禁止本地中断,并获取自旋锁。
void spin_unlock_irqrestore(spinlock_t *lock, unsigned long flags)将中断状态恢复到以前的状态,并且激活本地中断,释放自旋锁。

自旋锁的注意事项

①、因为在等待自旋锁的时候处于“自旋”状态,因此锁的持有时间不能太长,一定要
短,否则的话会降低系统性能。如果临界区比较大,运行时间比较长的话要选择其他的并发处
理方式,比如稍后要讲的信号量和互斥体。
②、自旋锁保护的临界区内不能调用任何可能引起进程调度 API函数,否则的话可能
导致死锁。 如调用copy_from_user(), copy_to_user(), kmalloc(), msleep()等函数, 有可能导致内核崩溃
③、不能递归申请自旋锁,因为一旦通过递归的方式申请一个你正在持有的锁,那么你就
必须“自旋”,等待锁被释放,然而你正处于“自旋”状态,根本没法释放锁。结果就是自己
把自己锁死了!
④、在编写驱动程序的时候我们必须考虑到驱动的可移植性,因此不管你用的是单核的还
是多核的 SOC,都将其当做多核 SOC来编写驱动程序。 在单核的情况下, 若中断和进程可能访问同一临界
区, 进程里调用spin_lock_irqsave()是安全的, 在中断里其实不调用spin_lock()也没问题, 因为spin_
lock_irqsave()可以保证这个cpu的中断服务不被执行. 但是, 如果cpu变成多核, spin_lock_irqsave()不能屏蔽
另一个cpu的中断, 所以另一个核就可能造成并发问题, 因此无论如何, 我们在中断服务程序里也应该调用spin_lock().

读写自旋锁

自旋锁不关心锁定的临界区究竟在进行什么操作, 不管是读还是写
实际上, 对共享资源并发访问, 多个执行单元同时读取是不会有问题的, 自旋锁衍生的读写自旋锁(rwlock)可允许读
的操作并发.

读写自旋锁的API

读写锁操作 API函数分为两部分,一个是给读使用的,一个是给写使用的

函数描述
DEFINE_RWLOCK(rwlock_t lock)定义并初始化读写锁
void rwlock_init(rwlock_t *lock)初始化读写锁。

读锁

函数描述
void read_lock(rwlock_t *lock)获取读锁。
void read_unlock(rwlock_t *lock)释放读锁。
void read_lock_irq(rwlock_t *lock)禁止本地中断,并且获取读锁。
void read_unlock_irq(rwlock_t *lock)打开本地中断,并且释放读锁。
void read_lock_irqsave(rwlock_t *lock, unsigned long flags)保存中断状态,禁止本地中断,并获取读锁。
void read_unlock_irqrestore(rwlock_t *lock, unsigned long flags)将中断状态恢复到以前的状态,并且激活本地中断,释放读锁。
void read_lock_bh(rwlock_t *lock)关闭下半部,并获取读锁。
void read_unlock_bh(rwlock_t *lock)打开下半部,并释放读锁。

写锁

函数描述
void write_lock(rwlock_t *lock)获取写锁。
void write_unlock(rwlock_t *lock)释放写锁。
void write_lock_irq(rwlock_t *lock)禁止本地中断,并且获取写锁。
void write_unlock_irq(rwlock_t *lock)打开本地中断,并且释放写锁。
void write_lock_irqsave(rwlock_t *lock, unsigned long flags)保存中断状态,禁止本地中断,并获取写锁。
void write_unlock_irqrestore(rwlock_t *lock, unsigned long flags)将中断状态恢复到以前的状态,并且激活本地中断,释放读锁。
void write_lock_bh(rwlock_t *lock)关闭下半部,并获取读锁。
void write_unlock_bh(rwlock_t *lock)打开下半部,并释放读锁。

顺序锁

顺序锁是对读写锁的一种优化, 若使用顺序锁, 读执行单元不会被写执行单元阻塞, 也就是说, 读执行单元在写执行单元对被顺序锁保护的共享资源进行写操作时仍然可以继续读, 而不必等待写执行单元完成写操作, 写执行单元也不需要等待读执行单元完成读操作再进行写操作.
但是写执行单元与写执行单元之间仍然互斥.
尽管读写之间不互斥, 但是如果读执行单元操作期间, 写执行单元已经发生了写操作, 那么, 读执行单元必需重新读取数据. 以便确保数据时完整的. 所以, 这种情况下, 读端可能反复读多次同样的区域才能得到有效的数据.

顺序锁的API

获取顺序锁

函数描述
void write_seqlock(seqlock_t *sl)获取顺序锁
void write_seqlock_irqsave(seqlock_t *lock, unsigned long flags)保存中断状态,禁止本地中断,并获取顺序锁。
void write_seqlock_irq(seqlock_t *lock)禁止本地中断,并且获取顺序锁
void write_seqlock_bh(seqlock_t *lock)关闭下半部,并获取顺序锁

释放顺序锁

函数描述
void write_sequnlock(seqlock_t *sl)释放顺序锁
void wirte_sequnlock_irqrestore(seqlock *sl, unsigned long flags)将中断状态恢复到以前的状态,并且激活本地中断,释放顺序锁
void write_sequnlock_irq(seqlock *sl)打开本地中断,并且释放顺序锁。
void write_sequnlock_bh(seqlock *sl)打开下半部,并释放顺序锁。

读开始

函数
unsigned read_seqbegin(const seqlock_t *sl)
unsigned read_seqbegin_irqsave(const seqlock_t *sl, flags)

读执行单在对被顺序锁sl保护的共享资源进行访问前需要调用该函数, 该函数返回顺序锁sl的当前顺序号.

重读

函数
int read_seqretry(const seqlock_t *sl, unsigned iv)
int read_seqretry_irqrestore(const seqlock_t *sl, flags)

读执行单元在访问完被顺序锁sl保护的共享资源后需要调用该函数来检查, 在读访问期间是否有写操作, 如果有写操作, 读执行单元就需要重新进行读操作

顺序锁的使用模式如下:

do{
	seqnum = read_seqbegin(&seqlock_a);
	读操作代码
}while(read_seqretry(&seqlock_a, seqnum));

RCU(Read-Copy-Update)

RCU工作原理:
不同于自旋锁, 使用RCU的读端没有锁, 内存屏障, 原子指令类的开销, 几乎可以认为是直接读(知识简单的标明开始和结束), 而RCU的写执行单元在访问它的共享资源前复制一个副本, 然后对副本进行修改, 最后使用一个回调机制在适当的时机把指向原来数据的指针重新指向新的被修改的数据, 这个时机就是所有引用该数据的CPU都退出对共享数据读操作的时候, 等待适当的时机这一时期称为宽限期.

例子:
进程A通过RCU修改某链表的N节点内容, RCU通过构造一个新的M节点, 在M节点中修改内容, 并用M节点代替N节点, 之后进程A等待在链表前期已经存在的所有读端结束后(即宽限期, 通过synchronize_rcu()API完成), 再释放原来的N节点.

假设存在一个链表:
struct foo{
	struct list_head list;
	int a;
	int b;
	int c;
};
LIST_HEAD(head)
...
p = search(head, key);
if(p == NULL){
	//...
}
q = kmalloc(sizeof(*p), GFP_KERNEL);
*q = *p;
q->b = 2;
q->c = 3;
list_replace_rcu(&p->list, &q->list);
synchronize_rcu();
kfree(p);

RCU的API

读锁定
rcu_read_lock()
rcu_read_lock_bh()

读解锁
rcu_read_unlock()
rc_read_unlock_bh()

同步RCU
synchronize_rcu()
该函数由RCU写执行单元调用, 它将阻塞写执行单元, 直到当前CPU上所有的已经存在的读执行单元完成读临界区, 写执行单元才可以继续下一步操作.synchronize_rcu()并不需要等待后续读临界区的完成.

挂接回调
void call_rcu(struct rcu_head* head, void (*func)(struct rcu_head *rcu));
call_rcu()由RCU写执行单元调用, 与synchronize_rcu()不同的是, 它不会使写执行单元阻塞, 因而可以再中断上下文或软中断中使用. 该函数把函数func挂接到rcu回调函数链上, 然后立即返回, 挂接的回调函数会在一个宽限期结束后被执行.

rcu_assign_pointer(p, v)
给RCU保护的指针赋一个新的值

rcu_dereference(p)
读端使用rcu_dereference()获取一个RCU保护的指针, 之后既可以安全的引用它, 一般需要在rcu_read_lock()/rcu_read_unlock()保护的区间引用这个指针

rcu_access_pointer(p)
读端使用rcu_access_pointer()获取一个RCU保护的指针, 之后并不引用它, 这种情况下, 我们只关心指针本身的值, 比如可以使用该函数判断指针是否为NULL
ex:

rcu_assign_pointer() rcu_dereference()结合, 写端分配一个新的 struct foo内存, 初始化其中的成员, 之后把该结构体的地址赋值给全局的gp指针:
struct foo{
	int a;
	int b;
	int c;
};
struct foo *gp = NULL;

p = kmalloc(sizeof(*p), GFP_KERNEL);
p->a = 1;
p->b = 2;
p->c = 3;
rcu_assign_pointer(gp, p);

读端访问该片区:
rcu_read_lock();
p = rcu_dereference(gp);
if(p != NULL){
	do_something_with(p->a, p->b, p->c);
}
rcu_read_unlock();

对于链表数据结构而言, Linux内核增加了专门给RCU保护的链表操作API

static inline void list_add_rcu(struct list_head *new, struct list_head *head);
该函数把链表元素new插入RCU保护的链表head的开头

static inline void list_add_tail_rcu(struct list_head *new, struct list_head *head);
该函数将元素new添加到被RCU保护的链表的尾部

static inline void list_del_rcu(struct list_head *entry);
该函数从RCU保护的链表中删除指定的元素entry.

static inline void list_replace_rcu(struct list_head *old, struct list_head *new);
它使用新的链表元素new取代旧的链表元素old

list_for_each_entry_rcu(pos, head)
该宏用于遍历由RCU保护的链表head, 只要在读执行单元临界区使用该函数, 它就可以安全的和其他RCU保护的链表操作函数(如list_add_rcu())并发运行.

ex:

链表写端代码
struct foo{
	struct list_head list;
	int a;
	int b;
	int c;
};
LIST_HEAD(head);


p = kmalloc(sizeof(*p), GFP_KERNEL);
p->a = 1;
p->b = 2;
p->c = 3;
list_add(&p->list, &head);

链表读端代码
rcu_read_lock();
list_for_each_entry_rcu(p, head, list)
if(p != NULL){
	do_something_with(p->a, p->b, p->c);
}
rcu_read_unlock();

信号量

相比于自旋锁,信号量可以使线程进入休眠状态,信号量的开销要比自旋锁大,因为信号量使
线程进入休眠状态以后会切换线程,切换线程就会有开销。

①、因为信号量可以使等待资源线程进入休眠状态,因此适用于那些占用资源比较久的场
合。
②、因此信号量不能用于中断中,因为信号量会引起休眠,中断不能休眠。 (也有不休眠的信号量API)
③、如果共享资源的持有时间比较短,那就不适合使用信号量了,因为频繁的休眠、切换
线程引起的开销要远大于信号量带来的那点优势。

信号量结构体:

struct semaphore { 
    raw_spinlock_t      lock; 
    unsigned int        count; 
    struct list_head    wait_list; 
};

信号量API

函数描述
DEFINE_SEAMPHORE(name)定义一个信号量,并且设置信号量的值为 1。
void sema_init(struct semaphore *sem, int val)初始化信号量 sem,设置信号量值为 val。
void down(struct semaphore *sem)获取信号量,因为会导致休眠,因此不能在中断中使用。
int down_trylock(struct semaphore *sem);尝试获取信号量,如果能获取到信号量就获取,并且返回 0。如果不能就返回非 0,并且不会进入休眠。
int down_interruptible(struct semaphore *sem)获取信号量,和 down类似,只是使用 down进入休眠状态的线程不能被信号打断。而使用此函数进入休眠以后是可以被信号打断的。
void up(struct semaphore *sem)释放信号量

互斥体

将信号量的值设置为 1就可以使用信号量进行互斥访问了,虽然可以通过信号量实现互斥,但是 Linux提供了一个比信号量更专业的机制来进行互斥,它就是互斥体—mutex。互斥访问表示一次只有一个线程可以访问共享资源,不能递归申请互斥体。在我们编写 Linux驱动的时候遇到需要互斥访问的地方建议使用 mutex。Linux内核使用 mutex结构体表示互斥体,定义如下:

struct mutex { 
    /* 1: unlocked, 0: locked, negative: locked, possible waiters */ 
    atomic_t        count; 
    spinlock_t      wait_lock; 
};

在使用 mutex之前要先定义一个 mutex变量。在使用 mutex的时候要注意如下几点:
①、mutex可以导致休眠,因此不能在中断中使用 mutex,中断中只能使用自旋锁。
②、和信号量一样,mutex保护的临界区可以调用引起阻塞的 API函数。
③、因为一次只有一个线程可以持有 mutex,因此,必须由 mutex的持有者释放 mutex。并且 mutex不能递归上锁和解锁。

互斥体的API

函数描述
DEFINE_MUTEX(name)定义并初始化一个 mutex变量。
void mutex_init(mutex *lock)初始化 mutex。
void mutex_lock(struct mutex *lock)获取 mutex,也就是给 mutex上锁。如果获取不到就进休眠。
void mutex_unlock(struct mutex *lock)释放 mutex,也就给 mutex解锁。
int mutex_trylock(struct mutex *lock)尝试获取 mutex,如果成功就返回 1,如果失败就返回 0。
int mutex_is_locked(struct mutex *lock)判断 mutex是否被获取,如果是的话就返回1,否则返回 0。
int mutex_lock_interruptible(struct mutex *lock)使用此函数获取信号量失败进入休眠以后可以被信号打断。

完成量(Completion)

用于一个执行单元等待另一个执行单元执行完某事

完成量的API

定义完成量
struct completion my_completion;

初始化完成量
初始化或者重新初始化my_completion这个完成量的值为0(即没有完成的状态)
init_completion(&my_completion);
reinit_completion(&my_completion)

等待完成量
等待一个完成量被唤醒:
void wait_for_completion(struct completion *c);

唤醒完成量
void complete(struct completion *c);
只唤醒一个等待的执行单元
void complete_all(struct completion *c);
释放所有等待同一完成量的执行单元