“原子”是不可分割的意思,原子操作是指一个实际运行的操作不可分割,这个运行必然会被执行并完成而不会被另外一个任务或者事件打断。也就说,它是最小的执行单位,不可能有比它更小的执行单位。linux原子操作的问题来源于中断、进程抢占以及多核SMP系统中程序并发执行访问临界区。为了防止临界区数据的混乱,通过原子操作来保证其数据的原子操作。这里的临界区域分为全局或者局部静态变量和其他的混合临界区,对于混合临界区的原子操作需要通过复杂的锁机制保证,对于变量其原子操作以来实际的硬件平台完成其数据的原子操作。这篇博客主要讨论某一个变量的原子操作。

对于变量的原子操作,在linux系统当中经典实例是实现资源的技术,很多引用计数(/linux/fork.c)都是通过原子操作实现(类似于C++中智能指针的引用)。当然,也可以通过原子操作来对某个操作或者某些资源上锁,具体操作只需要在资源开始计数加1,并做防护判定,结束的时候释放计数减1即可。不过这并不是原子操作常见的场景。

1. 硬件原子操作

1.1. 缘由

我们的程序逻辑经常遇到这样的操作序列:

  • 读一个位于memory中的变量的值到寄存器中
  • 修改该变量的值(也就是修改寄存器中的值)
  • 将寄存器中的数值写回memory中的变量值

如果这个操作序列是串行化的操作(在一个thread中串行执行),那么一切OK,然而,世界总是不能如你所愿。在多CPU体系结构中,运行在两个CPU上的两个内核控制路径同时并行执行上面操作序列,有可能发生下面的场景:

CPU1上的操作CPU2上的操作
读操作
读操作
修改修改
写操作
写操作

多个CPUs和memory chip是通过总线互联的,在任意时刻,只能有一个总线master设备(例如CPU、DMA controller)访问该Slave设备(在这个场景中,slave设备是RAM chip)。因此,来自两个CPU上的读memory操作被串行化执行,分别获得了同样的旧值。完成修改后,两个CPU都想进行写操作,把修改的值写回到memory。但是,硬件arbiter的限制使得CPU的写回必须是串行化的,因此CPU1首先获得了访问权,进行写回动作,随后,CPU2完成写回动作。在这种情况下,CPU1的对memory的修改被CPU2的操作覆盖了,因此执行结果是错误的。
不仅是多CPU,在单CPU上也会由于有多个内核控制路径的交错而导致上面描述的错误。一个具体的例子如下:

系统调用的控制路径中断handler控制路径
读操作
读操作
修改
写操作
修改
写操作

系统调用的控制路径上,完成读操作后,硬件触发中断,开始执行中断handler。这种场景下,中断handler控制路径的写回的操作被系统调用控制路径上的写回覆盖了,结果也是错误的。

1.2. 硬件

数据变量的原子操作是和具体的处理器架构有密切关系的,对于不同的架构linux内核中都有相应的文件完成特定的实现。处理器分为复杂指令集CISC和精简指令集RICS两种。对于复杂指令集的芯片,如X86,可以通过一条汇编指令完成原子的一个数据位的读取,修改、写入三个动作。但是,对于精简指令集,完成一个内存的修改需要先将数据加载到内存当中,计算之后再讲数据导入到内存当中。因此,像a++这样的操作在精简指令集架构芯片上操作不是原子的,必须翻译成rmw三个基本的操作。如果这个操作流程中途被打断,执行结果可能就不符合预期结果了。为了解决这个问题,常见的有两种芯片解决方式。一种芯片吧寄存器分类成了set和clear两个寄存器,需要修改的时候只需要将某个bit写为1,如果需要清除就使用clear命令完成,这样就可以保证操作的原子性。另外一种是bitband技术,对于一个32位寄存器,有一个32位影子寄存器对应。想改变一个这个寄存器的某一个bit值,只需要先修改影子寄存器,硬件完成两个寄存器的同步。

对于ARM处理器,读取的时候调用ldrex,写的时候使用strex组合命令实现变量原子操作。当两个线程同时使用这一组命令时,就会把并行的序列变成串行序列,只有一个完成最后的保存后面的相同变量的操作才能够再进行。ARM编译器不具备将特定操作翻译成ldrex和strex组合,需要手动完成汇编代码编写。具体这部分汇编操作见博客原子操作 - ARM汇编指令实例(一)_生活需要深度的博客-CSDN博客

2. 原子操作API

位原子操作

  // arch/arm/include/asm/bitops.h
  set_bit(nr, void *addr)      // addr内存中的nr位置1
  clear_bit
  change_bit
  test_bit

在linux内核系统当中原子操作是一个不需要数据编译优化的的整数结构体,直接完成对相关变量存储位置的硬件资源访问。其常用接口说明如表一所示。

typedef struct { volatile int counter; } atomic_t;

2.1. 接口API 

宏或函数说明
atomic_read(atomic_t * v);对原子类型的变量进行原子读操作,它返回原子类型的变量v的值。
atomic_set(atomic_t *v,int i)设置原子变量结构体内部counter的值为i
atomic_add(int i,atomic_t *v)给原子变量结构体内部counter值加i
atomic_inc(atomic_t *v)给原子变量结构体内部counter值加1
atomic_sub(int i,atomic_t *v)给原子变量结构体内部counter值减i
atomic_dec(atomic_t *v)给原子变量结构体内部counter值减1
atomic_add_return(int i, atomic_t *v)对原子变量结构体内部counter加i,返回加以后的结果
atomic_inc_return(atomic_t *v)对原子变量结构体内部counter加1,返回加以后的结果
atomic_sub_return(int i, atomic_t *v)对原子变量结构体内部的counter减i,返回减以后的结果
atomic_dec_return(atomic_t * v)对原子变量结构体内部的counter减1,返回减以后的结果
atomic_add_and_test(int i, atomic_t *v)对原子变量接头体内部的counter加i,并将结果与0进行比较,返回对应的逻辑比较结果
atomic_inc_and_test(atomic_t *v)
atomic_sub_and_test(int i, atomic_t *v)对原子变量结构体内部的counter减i,并将结果与0进行比较,返回对应的逻辑比较结果
atomic_dec_and_test(atomic_t *v)对原子变量结构体内部的counter减1,并将结果与0进行比较,返回对应的逻辑比较结果

2.3. 怎么使用

原子操作通常用于实现资源的引用计数,在TCP/IP协议栈的IP碎片处理中,就使用了引用计数,碎片队列结构structipq描述了一个IP碎片,字段refcnt就是引用计数器,它的类型为atomic_t,当创建IP碎片时(在函数ip_frag_create中),使用atomic_set函数把它设置为1,当引用该IP碎片时,就使用函数atomic_inc把引用计数加1,当不需要引用该IP碎片时,就使用函数ipq_put来释放该IP碎片,ipq_put使用函数atomic_dec_and_test把引用计数减1并判断引用计数是否为0,如果是就释放Ip碎片。函数ipq_kill把IP碎片从ipq队列中删除,并把该删除的IP碎片的引用计数减1(通过使用函数atomic_dec实现)。

2.4. 驱动原子操作(非经典使用)

#include <linux/init.h>
#include <linux/module.h>
#include <linux/fs.h>
#include <linux/device.h>
#include <linux/cdev.h>

MODULE_LICENSE("GPL");

dev_t dev;
struct cdev btn_cdev;
struct class *cls = NULL;

atomic_t btn_tv;       /*[1] 定义整型原子变量*/

int btn_open(struct inode *inode, struct file *filp) 
{
    if(!atomic_dec_and_test(&btn_tv)) { //保持减1操作的完整性
        atomic_inc(&btn_tv);			//[3] 保持为0
        return -EBUSY;
    }
    return 0;  //第一次进来return 0,打开成功且btn_tv=0;
}

int btn_close(struct inode *inode, struct file *filp)
{
    atomic_inc(&btn_tv);  //btn_tv=1,保持可以打开状态
    return 0;
}

struct file_operations btn_fops =
{
    .owner = THIS_MODULE,
    .open  = btn_open,
    .release = btn_close,
};

int __init btn_drv_init(void)
{
    alloc_chrdev_region(&dev, 100, 1, "mybuttons");        /*设备号的动态申请注册*/
    cdev_init(&btn_cdev, &btn_fops);                       /*初始化cdev*/
    cdev_add(&btn_cdev, dev, 1);                           /*注册cdev*/
    cls = class_create(THIS_MODULE, "buttons");            /*设备文件的自动创建*/
    device_create(cls, NULL, dev, NULL,"mybuttons");
   
    atomic_set(&btn_tv, 1);                                /*[2] 整型原子变量赋值为1*/
    return 0;
}

void __exit btn_drv_exit(void)
{
    /*销毁设备文件*/
    device_destroy(cls, dev);
    class_destroy(cls);
    /*注销cdev*/
    cdev_del(&btn_cdev);
    /*注销设备号*/
    unregister_chrdev_region(dev, 1);
}

module_init(btn_drv_init);
module_exit(btn_drv_exit);
obj-m  += btn_drv.o
all:
       make -C /home/chuckchee/driver/kernel M=$(PWD) modules
       cp *.ko ../../rootfs
       arm-cortex_a9-linux-gnueabi-gcc test.c -o test
       cp test ../../rootfs
clean:
       make -C /home/chuckchee/driver/kernel M=$(PWD) clean
#include <stdio.h>
#include <fcntl.h>
#include <unistd.h>

int main(void)
{
    int fd = open("/dev/mybuttons", O_RDWR); //读写方式打开

    if(fd < 0) {
        perror("open failed:");
        return -1;
    }
    printf("open successed: using device 20s...\n");
    sleep(20);
    printf("close device\n");
    close(fd);
    return 0;
}

原子操作主要实现当前某个设备驱动节点在同一时间只能背一个应用程序打开,只有当前驱动节点关闭以后下一个应用程序才能再次对这个文件节点打开并完成后期的操作。

2.5. 原子操作实现互斥

为便于比较,直接基于前篇文章:线程同步之互斥锁中的示例程序进行修改,用原子库取代互斥库的代码如下:

//atomic1.cpp 使用原子库取代互斥库实现线程同步
#include <chrono>
#include <atomic>
#include <thread>
#include <iostream> 
std::chrono::milliseconds interval(100);
std::atomic<bool> readyFlag(false);     //原子布尔类型,取代互斥量
std::atomic<int> job_shared(0); //两个线程都能修改'job_shared',将该变量特化为原子类型
int job_exclusive = 0; //只有一个线程能修改'job_exclusive',不需要保护
//此线程只能修改 'job_shared'
void job_1()
{   
    std::this_thread::sleep_for(5 * interval);
    job_shared.fetch_add(1);
    std::cout << "job_1 shared (" << job_shared.load() << ")\n";
    readyFlag.store(true);      //改变布尔标记状态为真
}
// 此线程能修改'job_shared'和'job_exclusive'
void job_2()
{
    while (true) {    //无限循环,直到可访问并修改'job_shared'
        if (readyFlag.load()) {     //判断布尔标记状态是否为真,为真则修改‘job_shared’
            job_shared.fetch_add(1);
            std::cout << "job_2 shared (" << job_shared.load() << ")\n";
            return;
        } else {      //布尔标记为假,则修改'job_exclusive'
            ++job_exclusive;
            std::cout << "job_2 exclusive (" << job_exclusive << ")\n";
            std::this_thread::sleep_for(interval);
        }
    }
}
int main() 
{
    std::thread thread_1(job_1);
    std::thread thread_2(job_2);
    thread_1.join();
    thread_2.join();
    getchar();
    return 0;
}

由示例程序可以看出,原子布尔类型可以实现互斥锁的部分功能,但在使用条件变量condition variable时,仍然需要mutex保护对condition variable的消费,即使condition variable是一个atomic object。

3.  CAS无锁编程

3.1. 什么是无锁编程

在原子操作出现之前,对共享数据的读写可能得到不确定的结果,所以多线程并发编程时要对使用锁机制对共享数据的访问过程进行保护。但锁的申请释放增加了访问共享资源的消耗,且可能引起线程阻塞、锁竞争、死锁、优先级反转、难以调试等问题。

现在有了原子操作的支持,对单个基础数据类型的读、写访问可以不用锁保护了,但对于复杂数据类型比如链表,有可能出现多个核心在链表同一位置同时增删节点的情况,这将会导致操作失败或错序。所以我们在对某节点操作前,需要先判断该节点的值是否跟预期的一致,如果一致则进行操作,不一致则更新期望值,这几步操作依然需要实现为一个RMW(Read-Modify-Write)原子操作,这就是前面提到的CAS(Compare And Swap)原子操作,它是无锁编程中最常用的操作。

既然无锁编程是为了解决锁机制带来的一些问题而出现的,那么无锁编程就可以理解为不使用锁机制就可保证多线程间原子变量同步的编程。无锁(lock-free)的实现只是将多条指令合并成了一条指令形成一个逻辑完备的最小单元,通过兼容CPU指令执行逻辑形成的一种多线程编程模型。

无锁编程是基于原子操作的,对基本原子类型的共享访问由load()与store(val)即可保证其并发同步,对抽象复杂类型的共享访问则需要更复杂的CAS来保证其并发同步,并发访问过程只是不使用锁机制了,但还是可以理解为有锁止行为的,其粒度很小,性能更高。对于某个无法实现为一个原子操作的并发访问过程还是需要借助锁机制来实现。

3.1 CAS原子操作实现无锁编程

CAS原子操作主要是通过函数a.compare_exchange(expected,desired)实现的,其语义为“我认为V的值应该为A,如果是,那么将V的值更新为B,否则不修改并告诉V的值实际为多少”,CAS算法的实现伪码如下:

bool compare_exchange_strong(T& expected, T desired) 
{ 
    if( this->load() == expected ) { 
        this->store(desired); 
        return true; 
    } else {
        expected = this->load();
    	return false; 
    } 
}

下面尝试实现一个无锁栈,代码如下:

//atomic3.cpp 使用CAS操作实现一个无锁栈
#include <atomic>
#include <iostream>
template<typename T>
class lock_free_stack
{
private:
    struct node
    {
        T data;
        node* next;
        node(const T& data) : data(data), next(nullptr) {}
    };
    std::atomic<node*> head;
 public:
    lock_free_stack(): head(nullptr) {}
    void push(const T& data)
    {
        node* new_node = new node(data);
        do{
            new_node->next = head.load();   //将 head 的当前值放入new_node->next
        }while(!head.compare_exchange_strong(new_node->next, new_node));
        // 如果新元素new_node的next和栈顶head一样,证明在你之前没人操作它,使用新元素替换栈顶退出即可;
        // 如果不一样,证明在你之前已经有人操作它,栈顶已发生改变,该函数会自动更新新元素的next值为改变后的栈顶;
        // 然后继续循环检测直到状态1成立退出;
    }
    T pop()
    {
        node* node;
        do{
            node = head.load();
        }while (node && !head.compare_exchange_strong(node, node->next));
        if(node) 
            return node->data;
    }
};
 
int main()
{
    lock_free_stack<int> s;
    s.push(1);
    s.push(2);
    s.push(3);
    std::cout << s.pop() << std::endl;
    std::cout << s.pop() << std::endl;
    
    getchar();
    return 0;
}

程序注释中已经解释的很清楚了,在将数据压栈前,先通过比较原子类型head与新元素的next指向对象是否相等来判断head是否已被其他线程修改,根据判断结果选择是继续操作还是更新期望,而这一切都是在一个原子操作中完成的,保证了在不使用锁的情况下实现共享数据的并发同步。

CAS 看起来很厉害,但也有缺点,最著名的就是 ABA 问题,假设一个变量 A ,修改为 B之后又修改为 A,CAS 的机制是无法察觉的,但实际上已经被修改过了。如果在基本类型上是没有问题的,但是如果是引用类型呢?这个对象中有多个变量,我怎么知道有没有被改过?聪明的你一定想到了,加个版本号啊。每次修改就检查版本号,如果版本号变了,说明改过,就算你还是 A,也不行。

上面的例子节点指针也属于引用类型,自然也存在ABA问题,比如在线程2执行pop操作,将A,B都删掉,然后创建一个新元素push进去,因为操作系统的内存分配机制会重复使用之前释放的内存,恰好push进去的内存地址和A一样,我们记为A’,这时候切换到线程1,CAS操作检查到A没变化成功将B设为栈顶,但B是一个已经被释放的内存块。该问题的解决方案就是上面说的通过打标签标识A和A’为不同的指针,具体实现代码读者可以尝试实现。

Logo

更多推荐