无锁队列的一种实现

无锁队列的一种实现

无锁队列

无锁编程,称为 lock-free,是利用处理器的一些特殊原子指令来避免传统并行设计中对锁的使用,例如 x86 平台下对应的指令是 CMPXCHG。使用无锁编程至少能够带来两个方面的好处,首先是避免锁的争用,提高性能,其次通过避免锁的使用,避免死锁。无锁队列是无锁编程中最基础的,本文在这里给出一个实现。

一、无锁编程基础

无锁编程最基本的一个操作是 CAS(compare and swap),无锁编程的操作用 C++ 语言描述的话如下所示,简而言之就是看看 *reg 里面的内容是不是 oldval,如果是的话,对其复制为 newval。

1
2
3
4
5
6
7
8
template <typename T>
bool compare_and_swap(T *reg, T oldval, T newval) {
if (*reg = oldval) {
*reg = newval;
return true;
}
return false;
}

CAS 实现是依赖编译器的:

  • GCC 的 CAS
    GCC4.1+ 版本支持 CAS 的原子操作。
1
2
bool __sync_bool_compare_and_swap (type *ptr, type oldval, type newval, ...)
type __sync_val_compare_and_swap (type *ptr, type oldval, type newval, ...)
  • Windows 的 CAS
1
2
3
InterlockedCompareExchange ( __inout LONG volatile *Target,
__in LONG Exchange,
__in LONG Comperand);
  • C++11 中的 CAS
1
2
3
4
5
6
template< class T >
bool atomic_compare_exchange_weak( std::atomic* obj,
T* expected, T desired );
template< class T >
bool atomic_compare_exchange_weak( volatile std::atomic* obj,
T* expected, T desired );

二、无锁队列的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
#include <iostream>
using namespace std;

struct ListNode {
ListNode *next;
int value;
ListNode (int val) : value(val), next(NULL) { }
};

class CASQueue {

private:
ListNode *head;
ListNode *tail;

public:
CASQueue() {
head = new ListNode(0);
tail = head;
}

void push(int value) {
ListNode *node = new ListNode(value);
ListNode *p = tail;
do {
while (p->next != NULL) {
p = p->next;
}
} while (__sync_bool_compare_and_swap(&p->next, NULL, node) != true);
__sync_bool_compare_and_swap(&tail, p, node);
}

int pop() {
ListNode *node = head;
ListNode *p;
do {
p = head;
} while (__sync_bool_compare_and_swap(&head, p, p->next) != true);
int val = node->next->value;
delete node->next;
node->next = NULL;
return val;
}
};

int main() {
CASQueue queue;
queue.push(3);
queue.push(4);
cout << queue.pop() << endl;
return 0;
}

三、CAS 的 ABA 问题

CAS 的 ABA 问题可以概括如下:

  1. 进程 P1 在共享变量中读到值为 A
  2. P1 被抢占了,进程 P2 执行
  3. P2 把共享变量里的值从 A 改成了 B,再改回到 A,此时被 P1 抢占
  4. P1 回来看到共享变量里的值没有被改变,于是继续执行

虽然 P1 以为变量值没有改变,继续执行了,但是这个会引发一些潜在的问题。ABA 问题最容易发生在 lock free 的算法中的,CAS 首当其冲,因为 CAS 判断的是指针的地址。如果这个地址被重用了呢,问题就很大了。(地址被重用是很经常发生的,一个内存分配后释放了,再分配,很有可能还是原来的地址)。

维基百科上有一个比较通俗的比喻:

你拿着一个装满钱的手提箱在飞机场,此时过来了一个火辣性感的美女,然后她很暖昧地挑逗着你,并趁你不注意的时候,把用一个一模一样的手提箱和你那装满钱的箱子调了个包,然后就离开了,你看到你的手提箱还在那,于是就提着手提箱去赶飞机去了。

四、解决 ABA 问题

可以用 double CAS,双保险 CAS 解决 ABA 问题,例如在 32 位系统上,我们要检查 64 位长的内容。

  • 一次用 CAS 检查双倍长度的值,前半部是指针,后半部分是一个计数器。
  • 只有这两个都一样,才算通过检查,要赋新的值。并把计数器累加 1。
  • 这样一来,ABA 发生时,虽然值一样,但是计数器就不一样(但是在 32 位的系统上,这个计数器会溢出回来又从 1 开始的,这还是会有 ABA 的问题)。

参考原文:
无锁队列的实现

评论

:D 一言句子获取中...

加载中,最新评论有1分钟缓存...