単方向リスト: 並行処理編 Part 3

ソースはGitHubに移行しました。

Non-Blocking Singly-linked List(Non-Blocking単方向リスト)

Non-Blocking Slingly-Linked Listについて、ここではCASベースの比較的単純なアルゴリズムを実装した。 "A Pragmatic Implementation of Non-Blocking Linked-Lists" Timothy L. Harris .pdf

アルゴリズムを簡単に図示する:

図の上部は避けなければならない状況の説明で、 削除(10)と挿入(20)が並行して実行された場合で、挿入するノード(20)が孤児になることをしめしている。
図の下部は今回実装したアルゴリズムで、削除するノードにマーキングして上記の状況を避ける。

ソースはGitHubに移行しました。

データ構造

データ構造list_tとnode_tを定義する。 list_tはListの本体、node_tはListに連結するnodeのデータ構造である。

nodeとlistのデータ構造を示す。

NonBlockingList.h:

typedef enum {UNMARKED = 0, MARKED = 1} node_stat;

typedef struct _next_ref {
  node_stat mark;
  void *node_ptr;
}__attribute__((packed)) next_ref;


typedef struct _node_t
{
  lkey_t key;                   /* key */
  val_t val;                    /* 値 */
  next_ref next;                /* 次のnodeへのリファレンス */
}__attribute__((packed)) node_t;

typedef struct _list_t
{
  node_t *head;
  node_t *tail;
} list_t;

list_tには、リストの先頭headと末尾tailが確保される。

基本関数

listの生成
/*
 * list_t *init_list(void)
 *
 * listを生成する。
 *
 * 成功すればlistへのポインタを返す。失敗ならNULLを返す。
 *
 */
list_t *init_list(void)
{
  list_t *list;
  
  if ((list = (list_t *) calloc(1, sizeof(list_t))) == NULL) {
    elog("calloc error");
    return NULL;
  }
  
  if ((list->head = (node_t *) calloc(1, sizeof(node_t))) == NULL) {
    elog("calloc error");
    goto end;
  }
  
  if ((list->tail = (node_t *) calloc(1, sizeof(node_t))) == NULL) {
    elog("calloc error");
    goto end;
  }

  list->head->key = INT_MIN;
  list->head->next = make_ref(list->tail, UNMARKED);

  list->tail->key = INT_MAX;
  list->tail->next = make_ref(NULL, UNMARKED);

  return list;

 end:
  free (list->head);
  free (list);
  return NULL;
}
ノードの検索

static inline bool_t
cas64(volatile next_ref * addr, const next_ref oldp, const next_ref newp)
{
  char result;
  __asm__ __volatile__("lock; cmpxchg8b %0; setz %1":"=m"(*(volatile next_ref *)addr),
		       "=q"(result)
		       :"m"(*(volatile next_ref *)addr), "a"(oldp.mark), "d"(oldp.node_ptr),
			"b"(newp.mark), "c"(newp.node_ptr)
		       :"memory");
  return ((int)result == true ? true : false);
}

#define is_marked_ref(ref)   (ref.mark == MARKED ? true:false)
#define is_unmarked_ref(ref) (ref.mark == UNMARKED ? true:false)

#define get_ptr(ref)          (ref.node_ptr)

static next_ref make_ref(node_t * node_ptr, const node_stat mark)
{
    next_ref ref;
    ref.mark = mark;
    ref.node_ptr = (node_t *) node_ptr;
    return ref;
}


#define get_unmarked_ref(ptr)  make_ref(ptr, UNMARKED)
#define get_marked_ref(ptr)    make_ref(ptr, MARKED)

static node_t *search(list_t * list, const lkey_t key, node_t **pred)
{
    node_t *pred_next = NULL;
    node_t *curr;

 search_again:
    do {
      node_t *t = list->head;
      node_t *t_next = (node_t *)get_ptr(list->head->next);

      /* step 1: find pred and curr */
      do {
	if (!is_marked_ref(t_next->next)) {
	  (*pred) = t;
	  pred_next = t_next;
	}

	t = t_next;	    t->next.mark = (int)UNMARKED;
	
	if (t == list->tail)
	  break;
	t_next = (node_t *)get_ptr(t->next);
      } while (t->key < key || is_marked_ref(t_next->next));
      assert(key <= t->key || is_unmarked_ref(t_next->next));

      curr = t;

      /* step 2: check nodes are adjacent */
      if (pred_next == curr) {
	if ((curr != list->tail) && (is_marked_ref(curr->next))) {
	  goto search_again;
	} else {	    
	  assert (pred_next == curr);
	  return curr;
	}
      }

      /* step 3: remove one or more marked nodes */
      if (cas64(&(*pred)->next, make_ref(pred_next, MARKED), make_ref(curr, UNMARKED)) == true) {

	if (curr != list->tail && is_marked_ref(curr->next)) {
	  goto search_again;
	} else {
	  assert (get_ptr((*pred)->next) == curr);
	  return curr;
	  goto end; // dummy
	}
      }
    }
    while (1);
    
 end:
    assert (get_ptr((*pred)->next) == curr);

    return curr;
}
ノードの追加

/*
 * bool_t add(list_t * list, const lkey_t key, const val_t val)
 *
 * listにnode(key,val)を追加する。
 * 追加位置はkeyの昇順に従う。すでに同じkeyを持つnodeがある場合は失敗。
 *
 * 成功すればtrue、失敗すればfalseを返す。
 */
bool add(list_t * list, const lkey_t key, const val_t val)
{
    node_t *pred = NULL;
    node_t *curr;
    node_t *newNode;

    if ((newNode = create_node(key, val)) == NULL)
      return false;

    do {
      curr = search(list, key, &pred);
      assert(pred->key < key && key <= curr->key);

      if ((curr != list->tail) && (curr->key == key)) {
	free_node(newNode);
	return false;
      }

      newNode->next = make_ref(curr, UNMARKED);
      if (cas64(&pred->next, make_ref(curr, UNMARKED), make_ref(newNode, UNMARKED)) == true) {
	break;
      }
    }
    while (1);

    return true;
}
関数add()内部で呼ぶnode生成関数create_node()を示す。
/*
 * node_t *create_node(const lkey_t key, const val_t val)
 *
 * (key, val)を持つnodeを生成する。
 *
 * 成功すればnodeへのポインタを返す。失敗すればNULLを返す。
 */
static node_t *create_node(const lkey_t key, const val_t val)
{
    node_t *node;

    if ((node = (node_t *) calloc(1, sizeof(node_t))) == NULL) {
      elog("calloc error");
      return NULL;
    }

    node->next = make_ref(NULL, UNMARKED);

    node->key = key;
    node->val = val;

    return node;
}
ノードの削除

ノードの削除も基本戦略は追加と同じである。

/*
 * bool_t delete(list_t * list, const lkey_t key, val_t *val)
 *
 * listからkeyを持つnodeを削除し、そのnodeのvalを*valに書き込む。
 * keyが存在しない場合は失敗。
 *
 * 成功すればtrue、失敗すればfalseを返す。
 */
bool_t delete(list_t * list, const lkey_t key, val_t *val)
{
    node_t *pred = NULL;
    node_t *curr, *curr_next;
    bool_t ret = true;

    do {
      curr = search(list, key, &pred);

      assert(pred->key < key && key <= curr->key);

      if ((curr == list->tail) || (curr->key != key)) {
#ifdef _DEBUG_
	if (curr->key != key)
	  printf ("delete ERROR: key = %ld curr->key %ld\n", (long int)key, (long int)curr->key);
#endif
	return false;
      }

      curr_next = (node_t *)get_ptr(curr->next);

      if (is_unmarked_ref(curr_next->next)) {
	node_t *node_ptr = (node_t *)get_ptr(curr->next);
	if (cas64(&curr->next, make_ref(node_ptr, curr->next.mark), get_marked_ref(node_ptr)) == true)
	  break;
      }
      
    }
    while (1);

    if (cas64(&pred->next, get_unmarked_ref(curr), get_unmarked_ref(curr_next)) != true) {
      curr = search(list, curr->key, &pred);
    }

    *val = curr->val;
    free_node(curr);
    
    return ret;
}
ノードの検索
/*
 * bool_t find(list_t * list, const lkey_t key, val_t *val)
 *
 * listからkeyを持つnodeを検索し、そのnodeのvalを*valに書き込む。
 * keyが存在しない場合は失敗。
 *
 * 成功すればtrue、失敗すればfalseを返す。
 */
bool find(list_t * l, const lkey_t key)
{
    node_t *curr;
    node_t *pred = NULL;

    curr = search(l, key, &pred);
    if ((curr == l->tail) || (curr->key != key))
	return false;

    return true;
}

実行

ソースはGitHubに移行しました。



Last-modified: 2014-7-6