Go/Rust 系统编程:无锁数据结构与 CAS 并发控制的深度剖析

cover

一、锁的性能代价:为什么 Mutex 不是高并发场景的最优解

在高并发场景下,互斥锁(Mutex)的性能瓶颈不仅来自锁竞争本身,更来自锁带来的缓存一致性开销。当一个 CPU 核心释放锁时,需要将修改的缓存行失效并同步到其他核心,这个过程涉及 MESI 协议的缓存行失效广播,延迟可达数百个时钟周期。当竞争激烈时,线程大部分时间花在等待锁和缓存同步上,而非执行业务逻辑。无锁数据结构通过原子操作(CAS)替代互斥锁,避免了锁竞争和缓存一致性开销,但引入了 ABA 问题和内存回收难题。

二、CAS 原理与无锁编程模型

CAS(Compare-And-Swap)是 CPU 提供的原子指令:比较内存位置的值与预期值,如果相等则写入新值,否则返回失败。CAS 是无锁编程的基石——所有无锁数据结构都通过 CAS 实现线程安全的状态变更。

graph TD
    A[线程读取共享变量 V] --> B[计算新值]
    B --> C[CAS: V == 旧值?]
    C -->|是| D[写入新值<br/>操作成功]
    C -->|否| E[重试:重新读取 V]
    E --> A

    F[ABA 问题] --> G[线程1 读 V=A]
    G --> H[线程2 将 V 改为 B 再改回 A]
    H --> I[线程1 CAS 成功<br/>但中间状态已被修改]

    I --> J[解决方案]
    J --> K[版本号 CAS<br/>携带单调递增版本]
    J --> L[Hazard Pointer<br/>延迟回收机制]

    style D fill:#e8f5e9
    style I fill:#ffcdd2
    style K fill:#e1f5fe
    style L fill:#c8e6c9

CAS 的核心挑战是 ABA 问题:线程 1 读取变量值为 A,线程 2 将其改为 B 又改回 A,线程 1 的 CAS 仍然成功,但变量实际上经历了中间状态变更。解决方案是在 CAS 中携带版本号,每次修改递增版本号,CAS 同时比较值和版本号。

三、无锁数据结构的工程实现

3.1 Go 无锁队列

package lockfree

import (
	"sync/atomic"
	"unsafe"
)

// node 表示队列中的节点
type node struct {
	value interface{}
	next  unsafe.Pointer // *node,使用 unsafe.Pointer 支持原子操作
}

// Queue 无锁 FIFO 队列,基于 Michael-Scott 算法
// 设计考量:队列由 head 和 tail 两个指针组成。
// head 指向哨兵节点(dummy node),tail 指向最后一个节点。
// 入队操作更新 tail,出队操作更新 head。
// 哨兵节点简化了空队列的边界处理。
type Queue struct {
	head unsafe.Pointer
	tail unsafe.Pointer
}

func NewQueue() *Queue {
	dummy := &node{}
	ptr := unsafe.Pointer(dummy)
	return &Queue{
		head: ptr,
		tail: ptr,
	}
}

// Enqueue 入队:将值追加到队列尾部
// 使用 CAS 保证并发安全,失败时自旋重试
func (q *Queue) Enqueue(value interface{}) {
	newNode := &node{value: value}
	newPtr := unsafe.Pointer(newNode)

	for {
		tail := atomic.LoadPointer(&q.tail)
		tailNode := (*node)(tail)
		next := atomic.LoadPointer(&tailNode.next)

		// 再次检查 tail 是否被其他线程修改
		if tail != atomic.LoadPointer(&q.tail) {
			continue
		}

		if next == nil {
			// tail.next 为空,尝试将新节点链接到 tail.next
			if atomic.CompareAndSwapPointer(&tailNode.next, nil, newPtr) {
				// 尝试推进 tail 指针,失败也无妨(其他线程会帮忙推进)
				atomic.CompareAndSwapPointer(&q.tail, tail, newPtr)
				return
			}
		} else {
			// tail.next 不为空,说明 tail 落后了,帮忙推进
			atomic.CompareAndSwapPointer(&q.tail, tail, next)
		}
	}
}

// Dequeue 出队:从队列头部移除并返回值
// 返回 (value, true) 表示成功,(nil, false) 表示队列为空
func (q *Queue) Dequeue() (interface{}, bool) {
	for {
		head := atomic.LoadPointer(&q.head)
		headNode := (*node)(head)
		tail := atomic.LoadPointer(&q.tail)
		next := atomic.LoadPointer(&headNode.next)

		// 再次检查 head 是否被其他线程修改
		if head != atomic.LoadPointer(&q.head) {
			continue
		}

		if head == tail {
			// head == tail,队列为空或 tail 落后
			if next == nil {
				// 队列确实为空
				return nil, false
			}
			// tail 落后了,帮忙推进
			atomic.CompareAndSwapPointer(&q.tail, tail, next)
		} else {
			// 读取 head.next 的值
			value := (*node)(next).value
			// 尝试推进 head 指针
			if atomic.CompareAndSwapPointer(&q.head, head, next) {
				return value, true
			}
		}
	}
}

3.2 Rust 无锁栈(带 Hazard Pointer 内存回收)

use std::sync::atomic::{AtomicPtr, Ordering};
use std::ptr;

/// 无锁栈节点
struct Node<T> {
    value: T,
    next: *mut Node<T>,
}

/// Hazard Pointer:防止节点在被其他线程引用时被回收
/// 设计考量:无锁数据结构的内存回收是最棘手的问题。
/// 一个线程可能正在读取某个节点,而另一个线程已经将其出栈并释放。
/// Hazard Pointer 通过"声明正在使用的指针"来延迟回收。
struct HazardPointer {
    ptr: AtomicPtr<()>,
}

impl HazardPointer {
    fn new() -> Self {
        HazardPointer {
            ptr: AtomicPtr::new(ptr::null_mut()),
        }
    }

    /// 保护一个指针:将其记录在 Hazard Pointer 中
    fn protect(&self, p: *mut ()) {
        self.ptr.store(p, Ordering::SeqCst);
    }

    /// 清除保护
    fn clear(&self) {
        self.ptr.store(ptr::null_mut(), Ordering::SeqCst);
    }

    /// 获取当前保护的指针
    fn get(&self) -> *mut () {
        self.ptr.load(Ordering::SeqCst)
    }
}

/// 无锁栈:基于 Treiber Stack 算法
pub struct LockFreeStack<T> {
    head: AtomicPtr<Node<T>>,
    hazard_pointers: Vec<HazardPointer>,
}

impl<T> LockFreeStack<T> {
    pub fn new(num_threads: usize) -> Self {
        let mut hps = Vec::with_capacity(num_threads);
        for _ in 0..num_threads {
            hps.push(HazardPointer::new());
        }
        LockFreeStack {
            head: AtomicPtr::new(ptr::null_mut()),
            hazard_pointers: hps,
        }
    }

    /// 入栈:将值压入栈顶
    pub fn push(&self, value: T) {
        let new_node = Box::into_raw(Box::new(Node {
            value,
            next: ptr::null_mut(),
        }));

        loop {
            let current_head = self.head.load(Ordering::SeqCst);
            // 设置新节点的 next 指向当前 head
            unsafe { (*new_node).next = current_head; }

            // CAS:尝试将 head 从 current_head 更新为 new_node
            if self.head.compare_exchange(
                current_head,
                new_node,
                Ordering::SeqCst,
                Ordering::SeqCst,
            ).is_ok() {
                return;
            }
            // CAS 失败,自旋重试
        }
    }

    /// 出栈:从栈顶弹出值
    /// thread_id 用于选择对应的 Hazard Pointer
    pub fn pop(&self, thread_id: usize) -> Option<T> {
        let hp = &self.hazard_pointers[thread_id];

        loop {
            let current_head = self.head.load(Ordering::SeqCst);

            if current_head.is_null() {
                hp.clear();
                return None;
            }

            // 保护当前 head,防止在 CAS 期间被其他线程释放
            hp.protect(current_head as *mut ());

            // 再次验证 head 未变(防止 ABA 问题)
            if self.head.load(Ordering::SeqCst) != current_head {
                continue;
            }

            let next = unsafe { (*current_head).next };

            // CAS:尝试将 head 从 current_head 更新为 next
            if self.head.compare_exchange(
                current_head,
                next,
                Ordering::SeqCst,
                Ordering::SeqCst,
            ).is_ok() {
                hp.clear();

                // 检查是否有其他线程的 Hazard Pointer 正在保护此节点
                let can_free = !self.hazard_pointers.iter().any(|h| {
                    h.get() == current_head as *mut ()
                });

                if can_free {
                    unsafe {
                        let node = Box::from_raw(current_head);
                        return Some(node.value);
                    }
                } else {
                    // 有其他线程正在引用此节点,延迟回收
                    // 生产环境应将节点加入待回收列表,定期扫描
                    unsafe {
                        return Some(ptr::read(&(*current_head).value));
                    }
                }
            }
            // CAS 失败,自旋重试
        }
    }
}

四、无锁编程的边界与权衡

无锁数据结构并非在所有场景下都优于基于锁的实现。当竞争程度较低时,Mutex 的开销几乎可以忽略(fast path 只需一次原子操作),而无锁数据结构的 CAS 重试循环反而引入了更多指令。无锁方案的优势在竞争激烈时才显现——它避免了线程阻塞和上下文切换的开销。

内存回收是无锁编程最棘手的问题。Go 的 GC 自动管理内存,但 GC 的 Stop-The-World 暂停可能影响延迟敏感场景;Rust 没有 GC,必须手动管理 Hazard Pointer 或使用 Epoch-Based Reclamation(EBR)。EBR 的延迟回收窗口可能导致内存占用峰值比基于锁的实现高 2-3 倍。

ABA 问题的完整解决方案需要双宽度 CAS(Double-Width CAS),即同时比较指针和版本号。但并非所有平台都支持 128-bit CAS。在不支持双宽度 CAS 的平台上,需要使用带标签指针(Tagged Pointer)技巧,将版本号压缩到指针的未使用高位中,但这限制了可寻址空间。

五、总结

无锁数据结构通过 CAS 原子操作替代互斥锁,在高竞争场景下提供了更优的吞吐和尾延迟表现。核心实践包括:Michael-Scott 队列实现无锁 FIFO,Treiber Stack 实现无锁 LIFO,Hazard Pointer 解决内存回收问题,版本号 CAS 解决 ABA 问题。选型时应根据竞争程度决定——低竞争场景 Mutex 足够,高竞争场景无锁方案优势明显。内存回收策略需根据语言特性选择:Go 依赖 GC,Rust 使用 Hazard Pointer 或 EBR。

更多推荐