zl程序教程

您现在的位置是:首页 >  系统

当前栏目

Linux 同步管理

Linux同步 管理
2023-06-13 09:17:34 时间

本篇介绍

本篇看下Linux如何实现线程安全问题

原子操作

对于基础类型操作,使用原子变量就可以做到线程安全,那原子操作是如何保证线程安全的呢?linux中的原子变量如下:

typedef struct {
    int counter;
} atomic_t;

#define ATOMIC_INIT(i) { (i) }

#ifdef CONFIG_64BIT
typedef struct {
    s64 counter;
} atomic64_t;
#endif

那如何保证原子性的呢?看下实现

#define ATOMIC_OP(op, c_op)                     \
static inline void generic_atomic_##op(int i, atomic_t *v)      \
{                                   \
    int c, old;                         \
                                    \
    c = v->counter;                         \
    while ((old = arch_cmpxchg(&v->counter, c, c c_op i)) != c) \
        c = old;                        \
}

#def

核心是使用了比较并交换函数cmpxchg,这个在部分平台上会有对应的原子指令,如果没有的话,就会通过关中断形式来保证原子性。

#define generic_cmpxchg_local(ptr, o, n) ({                 \
    ((__typeof__(*(ptr)))__generic_cmpxchg_local((ptr), (unsigned long)(o), \
            (unsigned long)(n), sizeof(*(ptr))));           \
})

如果ptr的值等于o,那么就会将n赋给ptr,并返回o,如果ptr的值不等于o,那么将ptr的值返回。

内存屏障

ARM架构终有3类内存屏障指令: 数据存储屏障(data memory barrier, DMB) 数据同步屏障(data synchronization barrier, DSB) 指令同步屏障(instruction synchronization barrier, ISB) linux 内核中的内存屏障函数如下:

barrier() 编译优化屏障,阻止编译器为了性能优化而进行指令重排
mb() 内存屏障,用于SMP和UP
rmb() 读内存屏障
wmb() 写内存屏障

smp_mb() 用于SMP场景的内存屏障
smp_rmb() 用于SMP场景的读内存屏障
smp_wmb() 用于SMP场景的写内存屏障
smp_read_barrier_depends() 读依赖屏障                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 

自旋锁

如果操作稍复杂,这时候原子变量就搞不定了,比如要操作一个链表,这时候就需要用一个机制可以保证某些操作可以不受干扰完成,自旋锁就是其中一种。结构定义如下:

/* Non PREEMPT_RT kernels map spinlock to raw_spinlock */
typedef struct spinlock {
    union {
        struct raw_spinlock rlock;

#ifdef CONFIG_DEBUG_LOCK_ALLOC
# define LOCK_PADSIZE (offsetof(struct raw_spinlock, dep_map))
        struct {
            u8 __padding[LOCK_PADSIZE];
            struct lockdep_map dep_map;
        };
#endif
    };
} spinlock_t;

raw_spinlock的结构是和平台相关的,arm上定义如下:

typedef struct {
    union {
        u32 slock;
        struct __raw_tickets {
#ifdef __ARMEB__
            u16 next;
            u16 owner;
#else
            u16 owner;
            u16 next;
#endif
        } tickets;
    };
} arch_spinlock_t;

有没有发现自旋锁似乎并没有想象中简单? 自旋不就应该是用一个变量,然后while循环来反复读取,如果是0那就可以获取了,如果是1那就继续旋转。可是这样在多CPU上存在性能问题,比如多个CPU都在争这个锁,而有一个CPU获得该锁后,接下来大概率还是该CPU继续获得锁,因为锁变量在该CPU的L1 Cache中,访问速度明显高于其他CPU,这时候为了做到公平, 就引入了FIFO机制,slock记录锁标记,next记录等待的序号,owner负责当前锁的持有者,这样就可以将锁沿着next传递了。

那也有种情况,比如在某个CPU中持有自旋锁,然后突然来了一个中断,而中断处理程序也需要获取该锁,那就立马死锁了,这种情况怎么搞?linux中也有对应的方法, 就是持有自旋锁的时候关闭中断,等操作完后再开启,对应的api如下:

spin_lock_irqsave(spinlock_t *lock, unsigned long flags)
spin_unlock_irqrestore(spinlock_t *lock, unsigned long flags)

信号量

对比自旋锁是忙等待,信号量就是睡眠等待,对应的结构如下:

struct semaphore {
    raw_spinlock_t      lock;
    unsigned int        count;
    struct list_head    wait_list;
};

这儿的lock是用来保护其他2个成员,count表示可以同时进入临界区的路径数目,wait_list是在该信号量上睡眠的进程。对应的操作方法如下:

extern void down(struct semaphore *sem);
extern int __must_check down_interruptible(struct semaphore *sem);
extern int __must_check down_killable(struct semaphore *sem);
extern int __must_check down_trylock(struct semaphore *sem);
extern int __must_check down_timeout(struct semaphore *sem, long jiffies);
extern void up(struct semaphore *sem);

互斥锁

互斥锁的行为和信号量的count为1时候一样,那为什么还需要重新实现一波呢?其实还是性能,信号量考虑的竞争路径不止一个,因此就决定了考虑的因素也会复杂一些,而互斥锁就是达到互斥效果就可以了,这样就可以针对这块进行优化。 结构定义如下:

struct mutex {
    atomic_long_t       owner;
    raw_spinlock_t      wait_lock;
#ifdef CONFIG_MUTEX_SPIN_ON_OWNER
    struct optimistic_spin_queue osq; /* Spinner MCS lock */
#endif
    struct list_head    wait_list;
#ifdef CONFIG_DEBUG_MUTEXES
    void            *magic;
#endif
#ifdef CONFIG_DEBUG_LOCK_ALLOC
    struct lockdep_map  dep_map;
#endif
};

这里的字段基本都可以自解释,有一点是当竞争不激烈的时候,比如只有一个进程在等锁,那么就不会休眠而是切换成自旋等待,也就是忙等待,这样可以避免进入休眠。 很多年前记得遇到过一个cpu 100%问题,那时候没想明白为什么一个等锁操作会占满cpu,这儿就是答案了。 对应的api就简单了:

extern void mutex_lock(struct mutex *lock);
extern int __must_check mutex_lock_interruptible(struct mutex *lock);
extern int __must_check mutex_lock_killable(struct mutex *lock);
extern int mutex_trylock(struct mutex *lock);
extern void mutex_unlock(struct mutex *lock);

读写锁

针对读写进一步优化就是读写锁,也有两种读写锁,一种是自旋的读写锁,一种是信号量类型的读写锁,先看下自旋的读写锁:

typedef struct {
    arch_rwlock_t raw_lock;
#ifdef CONFIG_DEBUG_SPINLOCK
    unsigned int magic, owner_cpu;
    void *owner;
#endif
#ifdef CONFIG_DEBUG_LOCK_ALLOC
    struct lockdep_map dep_map;
#endif
} rwlock_t;


typedef struct {
    u32 lock;
} arch_rwlock_t;

对应的api 如下:

#define read_trylock(lock)  __cond_lock(lock, _raw_read_trylock(lock))
#define write_trylock(lock) __cond_lock(lock, _raw_write_trylock(lock))

#define write_lock(lock)    _raw_write_lock(lock)
#define read_lock(lock)     _raw_read_lock(lock)

也有关中断版本,如下:

#define read_lock_irq(lock)     _raw_read_lock_irq(lock)
#define read_lock_bh(lock)      _raw_read_lock_bh(lock)
#define write_lock_irq(lock)        _raw_write_lock_irq(lock)
#define write_lock_bh(lock)     _raw_write_lock_bh(lock)
#define read_unlock(lock)       _raw_read_unlock(lock)
#define write_unlock(lock)      _raw_write_unlock(lock)
#define read_unlock_irq(lock)       _raw_read_unlock_irq(lock)
#define write_unlock_irq(lock)      _raw_write_unlock_irq(lock)

信号量版本如下:

struct rw_semaphore {
    atomic_long_t count;
    /*
     * Write owner or one of the read owners as well flags regarding
     * the current state of the rwsem. Can be used as a speculative
     * check to see if the write owner is running on the cpu.
     */
    atomic_long_t owner;
#ifdef CONFIG_RWSEM_SPIN_ON_OWNER
    struct optimistic_spin_queue osq; /* spinner MCS lock */
#endif
    raw_spinlock_t wait_lock;
    struct list_head wait_list;
#ifdef CONFIG_DEBUG_RWSEMS
    void *magic;
#endif
#ifdef CONFIG_DEBUG_LOCK_ALLOC
    struct lockdep_map  dep_map;
#endif
};

这儿的count值含义比较复杂,需要表示读写状态,也需要包含等待进程的状态。对应的api如下:

extern void down_read(struct rw_semaphore *sem);
extern int __must_check down_read_interruptible(struct rw_semaphore *sem);
extern int __must_check down_read_killable(struct rw_semaphore *sem);
extern int down_read_trylock(struct rw_semaphore *sem);
extern void down_write(struct rw_semaphore *sem);
extern int __must_check down_write_killable(struct rw_semaphore *sem);
extern int down_write_trylock(struct rw_semaphore *sem);
extern void up_read(struct rw_semaphore *sem);
extern void up_write(struct rw_semaphore *sem);
extern void downgrade_write(struct rw_semaphore *sem);

RCU

在读写锁场景下,多个读路径可以共存,只有写出现的时候才会互斥,这样的性能还不够高,最理想的状态是读写可以共存,这就是RCU机制(read copy update)。 RCU本质上就是发布订阅模式,读的时候就是订阅变量的变化,修改变量就是发布变量的变化,在发布变量变化时读路径有可能看到新变化也可能看不到新变化,但是可以保证是看到的会是一个有效的变量,只有当所有读操作结束后,旧变量才会被释放。有这种操作后,RCU比起读写锁的性能优势就出来了What is RCU? Part 2: Usage

image.png

而将读写锁换成RCU也比较简单,如下所示:

1 struct el {                          1 struct el {
 2   struct list_head list;             2   struct list_head list;
 3   long key;                          3   long key;
 4   spinlock_t mutex;                  4   spinlock_t mutex;
 5   int data;                          5   int data;
 6   /* Other data fields */            6   /* Other data fields */
 7 };                                   7 };
 8 rwlock_t listmutex;                  8 spinlock_t listmutex;
 9 struct el head;                      9 struct el head;
 1 int search(long key, int *result)    1 int search(long key, int *result)
 2 {                                    2 {
 3   struct list_head *lp;              3   struct list_head *lp;
 4   struct el *p;                      4   struct el *p;
 5                                      5
 6   read_lock(&listmutex);             6   rcu_read_lock();
 7   list_for_each_entry(p, head, lp) { 7   list_for_each_entry_rcu(p, head, lp) {
 8     if (p->key == key) {             8     if (p->key == key) {
 9       *result = p->data;             9       *result = p->data;
10       read_unlock(&listmutex);      10       rcu_read_unlock();
11       return 1;                     11       return 1;
12     }                               12     }
13   }                                 13   }
14   read_unlock(&listmutex);          14   rcu_read_unlock();
15   return 0;                         15   return 0;
16 }                                   16 }
 1 int delete(long key)                 1 int delete(long key)
 2 {                                    2 {
 3   struct el *p;                      3   struct el *p;
 4                                      4
 5   write_lock(&listmutex);            5   spin_lock(&listmutex);
 6   list_for_each_entry(p, head, lp) { 6   list_for_each_entry(p, head, lp) {
 7     if (p->key == key) {             7     if (p->key == key) {
 8       list_del(&p->list);            8       list_del_rcu(&p->list);
 9       write_unlock(&listmutex);      9       spin_unlock(&listmutex);
                                       10       synchronize_rcu();
10       kfree(p);                     11       kfree(p);
11       return 1;                     12       return 1;
12     }                               13     }
13   }                                 14   }
14   write_unlock(&listmutex);         15   spin_unlock(&listmutex);
15   return 0;                         16   return 0;
16 }                                   17 }

看下关键API:

image.png