Skiplist スキップリスト: 超基本編

ソースはGitHubに移行しました。

(単方向)Skiplistを実装する。
Skiplist は1990年にWilliam Pughによって発明されたアルゴリズムである。筆者は97年頃のUnix Magazineの記事で存在を知った。 その記事はネットワーク機器メーカーCISCO勤務の方が執筆しており、通常はネットワーク関連の話題が主なのだが、 唐突にアルゴリズムの話がはじまって驚いた覚えがある。 曰く「ルータの内部処理でルーティングテーブルにIPアドレスをキャッシュするが、 ネットワークの拡大に伴ってルーティングテーブル爆発が危惧されるとともに、 ルーティングテーブルの高速検索アルゴリズムが必要」なので興味があったらしい。
実際に使われているかどうかは知らないが、ネット時代のハードウエア人間はアルゴリズムにも詳しいと驚いた次第。

Skiplist(単方向スキップリスト)

すでに公開されているサンプルskiplist-1.1.tar.gz もあるが、この実装はユーザが排他制御しなければならないし、以降の改良のことも考えてフルスクラッチした。

ソースはGitHubに移行しました。

アルゴリズム

スキップリストの肝は、要素が乱数で決めた高さを持っていること。図示すると以下のようになる。

 level: head                            tail
   3   [-inf]------>[2]---------------->[inf]
   2   [-inf]------>[2]->[3]------>[7]->[inf]
   1   [-inf]->[1]->[2]->[3]->[5]->[7]->[inf]
   0   [-inf]->[1]->[2]->[3]->[5]->[7]->[inf]

高さ(レベル)毎に次の要素へのリンクを持っている。
ここで要素[5]を見つけるには次のようにする:

[5]を見つける:

 level: head                             tail
   3   [-inf]------>[2]- - - - - - - - -X[inf]
   2   [-inf]       [2]-->[3]- - - X[7]  [inf]
   1   [-inf]  [1]  [2]   [3]->[5]  [7]  [inf]
   0   [-inf]  [1]  [2]   [3]  [5]  [7]  [inf]

level=3から始める。
(1)最初にぶつかるのは要素[2]。
(2)目的の要素[5]はまだ先なので次に進むと末尾(tail)なので、levelを一つ下げる。
(3)要素[2]のlevel=2は要素[3]を指している。
(4)要素[3]のlevel=2は要素[7]を指しているので少し行き過ぎた。
(5)さらにlevelを下げて要素[3]のlevel=1から次に進むと要素[5]にぶつかる。

データ構造

データ構造list_tとnode_tを定義する。 list_tはListの本体、node_tはListに連結するnodeのデータ構造である。

以降、共通のデータ型として"common.h"に"bool_t"、"lkey_t"、"val_t"を定義する。 "bool_t"は自明、"lkey_t"はnodeのkey、"val_t"はnodeのvalを保持する。

common.h:

typedef bool  bool_t;
typedef int32_t lkey_t;
typedef int32_t val_t;

次に、nodeとlistのデータ構造を示す。

Skiplist .h:

typedef struct _skiplist_node_t {
  lkey_t key;                        /* key */
  val_t val;                         /* 値  */

  int topLevel;                      /* このノードのレベル(高さ) */
  struct _skiplist_node_t *next[1];  /* 次のnodeへのポインタ */
} skiplist_node_t;

typedef struct _skiplist_t {
  int	maxLevel;                    /* このリストの最大レベル(高さ) */

  skiplist_node_t *tail;
  skiplist_node_t *head;

  pthread_mutex_t mtx;

  /* add, delete, findのための補助メモリ領域 */
  skiplist_node_t **preds;
  skiplist_node_t **succs;
} skiplist_t;

skiplist_node_tは(key,val)のペアと次のノードへのポインタ、およびこのノードのレベル(高さ)を保持する。

skiplist_tには、リストの先頭headと末尾tail、およびlockのためのmutex変数が確保される。
また、各種操作(add,delete,find)のためにあらかじめ補助メモリ領域preds,succsを確保しておく。

基本関数

listの生成
/*
 * skiplist_t *init_list(const int maxLevel, const lkey_t min, const lkey_t max)
 *
 * skiplistを生成する。maxLevelは最大の高さ(レベル)を指定。
 * minはスキップリストのkeyの最小値(skiplist->head->keyに保持)、
 * maxはスキップリストのkeyの最大値(skiplist->tail->keyに保持)、
 *
 * 成功すればskiplistへのポインタを返す。失敗ならNULLを返す。
 *
 */
skiplist_t *init_list(const int maxLevel, const lkey_t min, const lkey_t max)
{
    skiplist_t *sl;
    skiplist_node_t *head, *tail;
    int i;

    if ((sl = (skiplist_t *) calloc(1, sizeof(skiplist_t))) == NULL) {
	elog("calloc error");
	return NULL;
    }

    sl->mtx = (pthread_mutex_t) PTHREAD_MUTEX_INITIALIZER;
    sl->maxLevel = maxLevel;

    if ((head = create_node(maxLevel, min, (val_t)NULL)) == NULL) {
      elog("create_node() error");
      goto end;
    }
    if ((tail = create_node(maxLevel, max, (val_t)NULL)) == NULL) {
      elog("create_node() error");
      goto end;
    }

    for (i = 0; i < maxLevel; i++) {
	head->next[i] = tail;
	tail->next[i] = NULL;
    }

    sl->head = head;
    sl->tail = tail;

    if ((sl->preds = (skiplist_node_t **) calloc(maxLevel, sizeof(skiplist_node_t *))) == NULL) {
	elog("calloc error");
	goto end;
    }
    if ((sl->succs = (skiplist_node_t **) calloc(maxLevel, sizeof(skiplist_node_t *))) == NULL) {
	elog("calloc error");
	goto end;
    }
    return sl;

 end:
    free(head);
    free(tail);
    free(sl->preds);
    free(sl);
    return NULL;
}
ノードの検索

各種操作(add,delete,find)が使うノードの検索関数を示す。

/*
 *static int search(skiplist_t * sl, const lkey_t key, skiplist_node_t ** preds,
 *                                                     skiplist_node_t ** succs)
 *
 * keyを持つノードを探す。
 * ノードが見つかった場合:
 *  そのノードのレベル(高さ)を返す。またsuccsには目的のノードが保持するポインタ、
 *  predsにはsuccsを指すノードへのポインタを返す。
 *  例) 以下のスキップリストでkey[4]のノードを探す場合、
 *
 *level: head                                 tail
 *  3   [-inf]------>[2]--------------------->[inf]
 *  2   [-inf]------>[2]------>[4]------>[7]->[inf]
 *  1   [-inf]->[1]->[2]------>[4]->[5]->[7]->[inf]
 *  0   [-inf]->[1]->[2]->[3]->[4]->[5]->[7]->[inf]
 *   
 *  predsとsuccsはそれぞれ次のようになり、レベル2を返す。
 *                level: preds succs
 *                  2    [2]   [7]
 *                  1    [2]   [5]
 *                  0    [3]   [5]
 *   preds[2]=2は"レベル2では、ノード(key=2)がノード(key=4)を指している"、
 *   preds[1]=2は"レベル1では、ノード(key=2)がノード(key=4)を指している"、
 *   preds[0]=3は"レベル0では、ノード(key=3)がノード(key=4)を指している"、
 * を意味し、
 *   succs[2]=7は"レベル2では、ノード(key=4)がノード(key=7)を指している"、
 *   succs[1]=5は"レベル1では、ノード(key=4)がノード(key=5)を指している"、
 *   succs[0]=5は"レベル0では、ノード(key=4)がノード(key=5)を指している"、
 * を意味する。
 */
static int search(skiplist_t * sl, const lkey_t key, skiplist_node_t ** preds,
		  skiplist_node_t ** succs)
{
    int lFound, level;
    skiplist_node_t *pred, *curr;

    pred = sl->head;
    lFound = -1;

    for (level = sl->maxLevel - 1; level >= 0; level--) {
      curr = pred->next[level];

      while (key > curr->key) {
	pred = curr;
	curr = pred->next[level];
      }
      
      if (lFound == -1 && key == curr->key) 
	    lFound = level;
      
      preds[level] = pred;
      succs[level] = curr;
    }

    return lFound;
}
nodeの追加

searchを行ったついでに、張り替えるリンク元とリンク先をsl->predsとsl->succsに保存しているのが肝。

/*
 * bool_t add(list_t * list, const lkey_t key, const val_t val)
 *
 * listにnode(key,val)を追加する。
 * 追加位置はkeyの昇順に従う。すでに同じkeyを持つnodeがある場合は失敗。
 *
 * 成功すればtrue、失敗すればfalseを返す。
 */
bool_t add(skiplist_t * sl, const lkey_t key, const val_t val)
{
    int level, topLevel, lFound;
    skiplist_node_t *newNode;
    int r = rand();
    bool_t ret = true;

    lock(sl->mtx);

    if ((lFound = search(sl, key, sl->preds, sl->succs)) != -1) {
      ret = false;
    } else {      
      topLevel = (r % sl->maxLevel);
      assert(0 <= topLevel && topLevel < sl->maxLevel);

      newNode = create_node(topLevel, key, val);
      
      for (level = 0; level <= topLevel; level++) {
	newNode->next[level] = sl->succs[level];
	sl->preds[level]->next[level] = newNode;
      }
    }

    unlock(sl->mtx);

    return ret;
}
関数add()内部で呼ぶnode生成関数create_node()を示す。
/*
 * static skiplist_node_t *create_node(const int topLevel, const lkey_t key, const val_t val)
 *
 * レベル(高さ)topLevelで(key, val)を持つnodeを生成する。
 *
 * 成功すればnodeへのポインタを返す。失敗すればNULLを返す。
 */
#define node_size(level)	  (sizeof(skiplist_node_t) + (level * sizeof(skiplist_node_t *)))

static skiplist_node_t *create_node(const int topLevel, const lkey_t key, const val_t val)
{
    skiplist_node_t *node;

    if ((node = (skiplist_node_t *) calloc(1, node_size(topLevel))) == NULL) {
	elog("calloc error");
	return NULL;
    }

    node->key = key;
    node->val = val;
    node->topLevel = topLevel;

    return node;
}
nodeの削除

追加が分かれば削除もほぼ自明。

/*
 * bool_t delete(skiplist_t * sl, lkey_t key, val_t * val)
 *
 * listからkeyを持つnodeを削除し、そのnodeのvalを*valに書き込む。
 * keyが存在しない場合は失敗。
 *
 * 成功すればtrue、失敗すればfalseを返す。
 */
bool_t delete(skiplist_t * sl, lkey_t key, val_t * val)
{
    int lFound, level;
    skiplist_node_t *victim = NULL;
    bool_t ret = true;

    lock(sl->mtx);

    if ((lFound = search(sl, key, sl->preds, sl->succs)) == -1) {
      ret = false;
    } else {

      victim = sl->succs[lFound];
      for (level = victim->topLevel; level >= 0; level--)
	sl->preds[level]->next[level] = victim->next[level];
      
      *val = victim->val;
      free_node(victim);
    }

    unlock(sl->mtx);

    return true;
}
nodeの検索
/*
 * val_t find(skiplist_t * sl, lkey_t key)
 *
 * skiplistからkeyを持つノードを検索する。ノードが存在する場合はそのノードのvalを返す。
 * keyが存在しない場合はNULLを返す。
 *
 */
val_t find(skiplist_t * sl, lkey_t key)
{
    int lFound;
    val_t ret;

    lock(sl->mtx);

    if ((lFound = search(sl, key, sl->preds, sl->succs)) == -1)
      ret = (val_t)NULL;
    else
      ret = sl->preds[0]->next[0]->val;
    
    unlock(sl->mtx);

    return ret;
}

実行

ソースはGitHubに移行しました。



Last-modified: 2014-7-6