|
|
|
UnBounded Queue: 並行処理編 Part 2ソースはGitHubに移行しました。 CAS based Lock-Free Queue
CASを使ったLock-Free Queueの論文"Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms"で提案されたアルゴリズムを実装。
例えばLock-Free Hashでスレッド毎に2つのhashテーブルを必要とするアルゴリズムがあったりする。 上限10スレッドなら20のhashテーブルがメモリ上に予め確保されて、複雑怪奇なアルゴリズムが動くわけである。CASによるLock-FreeなアルゴリズムはABA問題がつきまとう。 この実装ではアドレスは32bitと仮定し、アドレス(32bit)とラベル(uint_t:32bit)をペアにして64bitのデータとして扱うことで ABA問題を避けている。これはCASによるLock-Freeアルゴリズム実装の常套手段の一つのようだ。 X86_64環境のインラインアセンブラはまだ不慣れなため、今のところ32-bit版Linuxディストリビューション上でのみ動作保証。 Intel64(X86_64)環境は近々に対応したいと考えている。 ソースはGitHubに移行しました。 データ構造データ構造はほぼ論文通り。
typedef struct _pointer_t {
unsigned int count;
struct _node_t *ptr;
}__attribute__((packed)) pointer_t;
typedef struct _node_t {
val_t val;
pointer_t next;
}__attribute__((packed)) node_t;
typedef struct _queue_t
{
pointer_t head;
pointer_t tail;
} queue_t;
基本関数Lock-Freeのためのプリミティブほとんど素のcmpxchg8b。
inline bool_t
cas64(pointer_t * addr, const pointer_t oldp, const pointer_t newp)
{
char result;
__asm__ __volatile__("lock; cmpxchg8b %0; setz %1":"=m"(*addr),
"=q"(result)
:"m"(*addr), "a"(oldp.count), "d"(oldp.ptr),
"b"(newp.count), "c"(newp.ptr)
:"memory");
return (((int) result != 0) ? true : false);
}
queueの生成
queue_t *init_queue(void)
{
queue_t *q;
node_t *node;
if ((q = (queue_t *) calloc(1, sizeof(queue_t))) == NULL) {
elog("calloc error");
return NULL;
}
if ((node = create_node((val_t)NULL)) == NULL) {
elog("create_node() error");
abort();
}
q->head.ptr = node;
q->tail.ptr = node;
return q;
}
void free_queue(queue_t * q)
{
free(q);
}
enqueue
bool_t enq(queue_t * q, const val_t val)
{
node_t *newNode;
pointer_t tail, next, tmp;
if ((newNode = create_node(val)) == NULL)
return false;
while (1) {
tail = q->tail;
next = tail.ptr->next;
if (tail.count == q->tail.count && tail.ptr == q->tail.ptr) {
if (next.ptr == NULL) {
tmp.ptr = newNode; tmp.count = next.count + 1;
if (cas64(&tail.ptr->next, next, tmp) == true) {
break;
}
}
else {
tmp.ptr = next.ptr; tmp.count = tail.count + 1;
cas64(&q->tail, tail, tmp);
}
}
}
tmp.ptr = newNode; tmp.count = tail.count + 1;
cas64(&q->tail, tail, tmp);
return true;
}
static node_t *create_node(const val_t val)
{
node_t *node;
if ((node = (node_t *) calloc(1, sizeof(node_t))) == NULL) {
elog("calloc error");
return NULL;
}
node->val = val;
node->next.ptr = NULL;
node->next.count = 0;
return node;
}
dequeue
bool_t deq(queue_t * q, val_t * val)
{
pointer_t head, tail, next, tmp;
while (1) {
head = q->head;
tail = q->tail;
next = head.ptr->next;
if (head.count == q->head.count && head.ptr == q->head.ptr) {
if (head.ptr == tail.ptr) {
if (next.ptr == NULL) {
return false;
}
tmp.ptr = next.ptr; tmp.count = head.count + 1;
cas64(&q->tail, tail, tmp);
}
else {
*val = next.ptr->val;
tmp.ptr = next.ptr; tmp.count = head.count + 1;
if (cas64(&q->head, head, tmp) == true) {
break;
}
}
}
}
free_node (head.ptr);
return true;
}
実行ソースはGitHubに移行しました。
Last-modified: 2014-7-6
|