単方向リスト: 並行処理編 Part.1

ソースはGitHubに移行しました。

Fine-Grained Synchronization Singly-linked List(細粒度同期 単方向リスト) pthread_mutex編

先のリストは追加や削除、検索の基本操作毎にLockをかけるので、 同時にリストにアクセスできるのは1スレッドだけである。

以降では、もう少しLockの粒度を細かくして並行性を高め、 複数のスレッドが同時にリスト操作可能となるような改造を試みる。

ソースはGitHubに移行しました。

データ構造

データ構造list_tとnode_tを定義する。 list_tはListの本体、node_tはListに連結するnodeのデータ構造である。

nodeとlistのデータ構造を示す。

FineGrainedSynchroList.h:

typedef struct _node_t
{
  lkey_t key;           /* key */
  val_t val;            /* 値 */
  struct _node_t *next; /* 次のnodeへのポインタ */
  pthread_mutex_t mtx;  /* node毎のlock */
} node_t;

typedef struct _list_t
{
  node_t *head;
  node_t *tail;
} list_t;

node_tは(key,val)のペアと次のnodeへのポインタ、およびpthreadのmutex変数を保持する。

list_tには、リストの先頭headと末尾tailが確保される。

基本関数

listの生成
/*
 * list_t *init_list(void)
 *
 * listを生成する。
 *
 * 成功すればlistへのポインタを返す。失敗ならNULLを返す。
 *
 */
list_t *init_list(void)
{
    list_t *list;

    if ((list = (list_t *) calloc(1, sizeof(list_t))) == NULL) {
      elog("calloc error");
      return NULL;
    }
    
    if ((list->head = (node_t *) calloc(1, sizeof(node_t))) == NULL) {
      elog("calloc error");
      free (list);
      goto end;
    }
    list->head->mtx = (pthread_mutex_t) PTHREAD_MUTEX_INITIALIZER;
    
    if ((list->tail = (node_t *) calloc(1, sizeof(node_t))) == NULL) {
      elog("calloc error");
      goto end;
    }
    list->tail->mtx = (pthread_mutex_t) PTHREAD_MUTEX_INITIALIZER;
    
    list->head->next = list->tail;
    list->tail->next = NULL;
  
  return list;

 end :
  free (list->head);
  free (list);
  return NULL;
}
nodeの追加
/*
 * bool_t add(list_t * list, const lkey_t key, const val_t val)
 *
 * listにnode(key,val)を追加する。
 * 追加位置はkeyの昇順に従う。すでに同じkeyを持つnodeがある場合は失敗。
 *
 * 成功すればtrue、失敗すればfalseを返す。
 */
bool add(list_t * list, const lkey_t key, const val_t val)
{
    node_t *pred, *curr;
    node_t *newNode;
    bool ret = true;

    if ((newNode = create_node(key, val)) == NULL)
      return false;

    lock(list->head->mtx);  /* リストの先頭headのlockを取得 */

    pred = list->head;
    curr = pred->next;

    lock(curr->mtx);        /* headの次のノードcurrのlockを取得 */

    if (curr == list->tail) {      /* リストにノードがない場合 */
	list->head->next = newNode;
	newNode->next = list->tail;

	unlock(list->head->mtx);
	unlock(curr->mtx);
    } else {
      /* 
       * 1つか2つのlockを取得しながら、ノード検査を進める
       */
	while (curr != list->tail && curr->key < key) {
	  /* predとcurrのlockを取得中 */
	  unlock(pred->mtx);
	  /* currのlockのみ取得中 -> 以下で"curr"から"pred"に改名 */
	  pred = curr;  curr = curr->next; 
	  lock(curr->mtx); 
	  /* predとcurrのlockを取得中 */
	}

	/* この時点で成立する条件式:
	 *      (pred->key) < (newNode->key) <= (curr->key) 
	 */

	if (curr != list->tail && key == curr->key) {  /* すでに同じkeyを持つノードがある場合 */
	    node_free(newNode);
	    ret = false;
	} else {	                               /* ノードを追加 */
	    newNode->next = curr;
	    pred->next = newNode;
	}
	unlock(pred->mtx);
	unlock(curr->mtx);
    }

    return ret;
}
関数add()内部で呼ぶnode生成関数create_node()を示す。
/*
 * node_t *create_node(const lkey_t key, const val_t val)
 *
 * (key, val)を持つnodeを生成する。
 *
 * 成功すればnodeへのポインタを返す。失敗すればNULLを返す。
 */
static node_t *create_node(const lkey_t key, const val_t val)
{
    node_t *node;

    if ((node = calloc(1, sizeof(node_t))) == NULL) {
      elog("calloc error");
      return NULL;
    }
    node->mtx = (pthread_mutex_t) PTHREAD_MUTEX_INITIALIZER;

    node->key = key;
    node->val = val;

    return node;
}
nodeの削除
/*
 * bool_t delete(list_t * list, const lkey_t key, val_t *val)
 *
 * listからkeyを持つnodeを削除し、そのnodeのvalを*valに書き込む。
 * keyが存在しない場合は失敗。
 *
 * 成功すればtrue、失敗すればfalseを返す。
 */
bool_t delete(list_t * list, const lkey_t key, val_t *val)
{
    node_t *pred, *curr;
    bool_t ret = true;

    lock(list->head->mtx);  /* リストの先頭headのlockを取得 */

    pred = list->head;
    curr = pred->next;

    lock(curr->mtx);        /* headの次のノードcurrのlockを取得 */

    if (curr == list->tail) {      /* リストにノードがない場合 */
	unlock(list->head->mtx);
	unlock(curr->mtx);
	ret = false;
    } else {
      /* 
       * 1つか2つのlockを取得しながら、ノード検査を進める
       */
	while (curr != list->tail && curr->key < key) {
	    unlock(pred->mtx);
	    pred = curr;
	    curr = curr->next;
	    lock(curr->mtx);
	}

	/* この時点で成立する条件式:
	 *      (pred->key) < (newNode->key) <= (curr->key) 
	 */
	if (curr != list->tail && key == curr->key) {
	  *val = curr->val;
	  pred->next = curr->next;

	  unlock(curr->mtx);	  node_free(curr); /* currのlock解除と、ノードの開放 */
	  unlock(pred->mtx);	  /* currが完全に削除された後にpredのlockを解除 */
	} else {
	  unlock(pred->mtx);
	  unlock(curr->mtx);
	  
	  ret = false;
	}
    }
    return ret;
}
nodeの検索
/*
 * bool_t find(list_t * list, const lkey_t key, val_t *val)
 *
 * listからkeyを持つnodeを検索し、そのnodeのvalを*valに書き込む。
 * keyが存在しない場合は失敗。
 *
 * 成功すればtrue、失敗すればfalseを返す。
 */
bool_t find(list_t * list, const lkey_t key, val_t *val)
{
  node_t *pred, *curr;
  bool_t ret = true;
  
  lock(list->mtx);
  
  pred = list->head;
  curr = pred->next;
  
  if (curr == list->tail) {
    ret = false;
  } else {
    while (curr != list->tail && curr->key < key) {
      pred = curr;
      curr = curr->next;
    }
    if (curr != list->tail && key == curr->key) {
      *val = curr->val;
      ret = true;
    }
    else
      ret = false;
  }
  
  unlock(list->mtx);
  return ret;
}

実行

ソースはGitHubに移行しました。



Last-modified: 2014-7-6