UnBounded Queue: 並行処理編 Part 2

ソースはGitHubに移行しました。

CAS based Lock-Free Queue

CASを使ったLock-Free Queueの論文"Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms"で提案されたアルゴリズムを実装。

特徴としては:

  1. 非常にシンプル
  2. スレッド数に制限がないこと(予めスレッド数の上限を設定しなくてよい)
  3. 使われるメモリ領域はqueueの要素数に比例し、スレッド数に依存しない(スレッドはメモリ領域を要求しない)
これはかなり良い性質である。なぜならLock-Freeなアルゴリズムの中には複雑怪奇なものが多く、 予めスレッド数の上限を指定しなければならないものや、スレッド毎にかなりのメモリ領域を要求するものも多いからだ。
例えばLock-Free Hashでスレッド毎に2つのhashテーブルを必要とするアルゴリズムがあったりする。 上限10スレッドなら20のhashテーブルがメモリ上に予め確保されて、複雑怪奇なアルゴリズムが動くわけである。


CASによるLock-FreeなアルゴリズムはABA問題がつきまとう。 この実装ではアドレスは32bitと仮定し、アドレス(32bit)とラベル(uint_t:32bit)をペアにして64bitのデータとして扱うことで ABA問題を避けている。これはCASによるLock-Freeアルゴリズム実装の常套手段の一つのようだ。

X86_64環境のインラインアセンブラはまだ不慣れなため、今のところ32-bit版Linuxディストリビューション上でのみ動作保証。 Intel64(X86_64)環境は近々に対応したいと考えている。

ソースはGitHubに移行しました。

データ構造

データ構造はほぼ論文通り。

typedef struct _pointer_t {
  unsigned int count;
  struct _node_t *ptr;
}__attribute__((packed)) pointer_t;


typedef struct _node_t {
  val_t val;
  pointer_t next;
}__attribute__((packed)) node_t;

typedef struct _queue_t
{
  pointer_t head;
  pointer_t tail;
} queue_t;

基本関数

Lock-Freeのためのプリミティブ

ほとんど素のcmpxchg8b。

inline bool_t
cas64(pointer_t * addr, const pointer_t oldp, const pointer_t newp)
{
    char result;
    __asm__ __volatile__("lock; cmpxchg8b %0; setz %1":"=m"(*addr),
			 "=q"(result)
			 :"m"(*addr), "a"(oldp.count), "d"(oldp.ptr),
			 "b"(newp.count), "c"(newp.ptr)
			 :"memory");
    return (((int) result != 0) ? true : false);
}
queueの生成

queue_t *init_queue(void)
{
    queue_t *q;
    node_t *node;

    if ((q = (queue_t *) calloc(1, sizeof(queue_t))) == NULL) {
      elog("calloc error");
	return NULL;
    }

    if ((node = create_node((val_t)NULL)) == NULL) {
      elog("create_node() error");
      abort();
    }

    q->head.ptr = node;
    q->tail.ptr = node;

    return q;
}

void free_queue(queue_t * q)
{
  free(q);
}

enqueue
bool_t enq(queue_t * q, const val_t val)
{
    node_t *newNode;
    pointer_t tail, next, tmp;

    if ((newNode = create_node(val)) == NULL)
	return false;

    while (1) {
	tail = q->tail;
	next = tail.ptr->next;

	if (tail.count == q->tail.count && tail.ptr == q->tail.ptr) {
	  if (next.ptr == NULL) {
	    tmp.ptr = newNode;	    tmp.count = next.count + 1;
	    if (cas64(&tail.ptr->next, next, tmp) == true) {
	      break;
	    }
	  }
	  else {
	    tmp.ptr = next.ptr;	    tmp.count = tail.count + 1;
	    cas64(&q->tail, tail, tmp);
	  }
	}
    }
    tmp.ptr = newNode;    tmp.count = tail.count + 1;
    cas64(&q->tail, tail, tmp);

    return true;
}


static node_t *create_node(const val_t val)
{
    node_t *node;

    if ((node = (node_t *) calloc(1, sizeof(node_t))) == NULL) {
      elog("calloc error");
	return NULL;
    }

    node->val = val;
    node->next.ptr = NULL;
    node->next.count = 0;

    return node;
}

dequeue
bool_t deq(queue_t * q, val_t * val)
{
  pointer_t head, tail, next, tmp;
 
    while (1) {
	head = q->head;
	tail = q->tail;
	next = head.ptr->next;

	if (head.count == q->head.count && head.ptr == q->head.ptr) {
	  if (head.ptr == tail.ptr) {
	    if (next.ptr == NULL) {
	      return false;
	    }
	    tmp.ptr = next.ptr;	    tmp.count = head.count + 1;
	    cas64(&q->tail, tail, tmp);
	  }
	  else {
	    *val = next.ptr->val;
	    tmp.ptr = next.ptr;	    tmp.count = head.count + 1;
	    if (cas64(&q->head, head, tmp) == true) {
	      break;
	    }
	  }
	}
    }

    free_node (head.ptr);
    return true;
}

実行

ソースはGitHubに移行しました。



Last-modified: 2014-7-6