|
|
|
Skiplist スキップリスト: 並行処理編 Part. 1ソースはGitHubに移行しました。 Lazy Skiplist(単方向 Lazy スキップリスト)
(単方向)Lazy Skiplistを実装する: ソースはGitHubに移行しました。 データ構造データ構造list_tとnode_tを定義する。 list_tはListの本体、node_tはListに連結するnodeのデータ構造である。 以降、共通のデータ型として"common.h"に"bool_t"、"lkey_t"、"val_t"を定義する。 "bool_t"は自明、 "lkey_t"はnodeのkey、"val_t"はnodeのvalを保持する。 common.h: typedef bool bool_t; typedef uint64_t lkey_t; typedef int64_t val_t; 次に、nodeとlistのデータ構造を示す。
LazySkiplist .h:
typedef struct _skiplist_node_t {
lkey_t key; /* key */
val_t val; /* 値 */
int topLevel; /* このノードのレベル(高さ) */
bool_t marked; /* 論理的に削除されているか否か示す */
bool_t fullyLinked;
pthread_mutex_t mtx;
struct _skiplist_node_t *next[1]; /* 次のnodeへのポインタ */
} skiplist_node_t;
typedef struct _skiplist_t {
int maxLevel; /* このリストの最大レベル(高さ) */
skiplist_node_t *head;
skiplist_node_t *tail;
pthread_key_t workspace_key; /* add, delete, findのための補助メモリ領域用pthread_key */
} skiplist_t;
typedef struct _workspace_t { /* add, delete, findのための補助メモリ領域。スレッド毎に保持する。 */
skiplist_node_t **preds;
skiplist_node_t **succs;
} workspace_t;
skiplist_node_tは(key,val)のペアと次のノードへのポインタ、およびこのノードのレベル(高さ)を保持する。 また、markedとfullyLinkedの2つのマーク(bool_t)と、 ノード毎のlockのためにpthreadのmutex変数を確保する。
skiplist_tにはリストの先頭headと末尾tailが確保される。 具体的には、pthread_keyの機能を利用する。 skiplist_tにはpthread_keyのみ確保し別途、構造体workspace_tを用意してスレッド毎にメモリ領域を確保する。 基本関数スレッド毎のメモリ領域確保pthread_keyを知っていれば自明なコードである。
concurrent_skiplist.h:
/* static workspace_t *init_workspace(const int maxLevel)
*
* (スレッド毎に)skiplist *slの補助メモリ領域:predsとsuccsを確保する。
*
* 成功すると確保したメモリへのポインタを返す。失敗するとNULLを返す。
*/
static workspace_t *init_workspace(const int maxLevel)
{
workspace_t *ws;
if ((ws = (workspace_t *) calloc(1, sizeof(workspace_t))) == NULL) {
elog("calloc error");
return NULL;
}
if ((ws->preds = (skiplist_node_t **) calloc(1, sizeof(skiplist_node_t *) * maxLevel)) == NULL) {
elog("calloc error");
goto end;
}
if ((ws->succs = (skiplist_node_t **) calloc(1, sizeof(skiplist_node_t *) * maxLevel)) == NULL) {
elog("calloc error");
goto end;
}
return ws;
end:
free(ws->preds);
free(ws);
return (workspace_t *) NULL;
}
/*
* void free_workspace(workspace_t * ws)
*
* wsを開放する。
*/
static void free_workspace(workspace_t * ws)
{
free(ws->preds);
free(ws->succs);
free(ws);
}
/*
* workspace_t *get_workspace(skiplist_t * sl)
*
* (スレッド毎に)初めて呼ばれた時はメモリ領域へ確保し、pthread_keyに対応させる。
* 必ずスレッドに対応したメモリ領域へのポインタを返す。
*
*/
static workspace_t *get_workspace(skiplist_t * sl)
{
/* スレッド毎のメモリ領域へのポインタを得る */
workspace_t *workspace = pthread_getspecific(sl->workspace_key);
/* まだメモリ領域が確保されていない場合: */
if (workspace == NULL) {
/* メモリ領域の確保 */
if ((workspace = init_workspace(sl->maxLevel)) != NULL) {
/* メモリ領域をpthread_keyに対応させる */
if (pthread_setspecific(sl->workspace_key, (void *) workspace) != 0) {
elog("pthread_setspecific() error");
abort();
}
} else {
elog("init_workspace() error");
abort();
}
}
assert(workspace != NULL);
return workspace;
}
少し先行して、使い方を示す。
bool_t add(skiplist_t * sl, lkey_t key, val_t val)
{
workspace_t *ws = get_workspace(sl); /* スレッド毎に確保された補助メモリ領域wsへのポインタを返す */
assert(ws != NULL);
return _add(sl, ws->preds, ws->succs, key, val); /* 実際にノードの追加を行う */
}
listの生成
/*
* skiplist_t *init_list(const int maxLevel, const lkey_t min, const lkey_t max)
*
* skiplistを生成する。maxLevelは最大の高さ(レベル)を指定。
* minはスキップリストのkeyの最小値(skiplist->head->keyに保持)、
* maxはスキップリストのkeyの最大値(skiplist->tail->keyに保持)、
*
* 成功すればskiplistへのポインタを返す。失敗ならNULLを返す。
*
*/
skiplist_t *init_list(const int maxLevel, const lkey_t min, const lkey_t max)
{
int i;
skiplist_t *sl;
skiplist_node_t *head, *tail;
if ((sl = (skiplist_t *) calloc(1, sizeof(skiplist_t))) == NULL) {
elog("calloc error");
return NULL;
}
sl->maxLevel = maxLevel;
if ((head = create_node(maxLevel, min, min)) == NULL) {
elog("create_node() error");
goto end;
}
head->fullyLinked = true;
if ((tail = create_node(maxLevel, max, max)) == NULL) {
elog("create_node() error");
goto end;
}
tail->fullyLinked = true;
for (i = 0; i < maxLevel; i++) {
head->next[i] = tail;
tail->next[i] = NULL;
}
sl->head = head;
sl->tail = tail;
/* pthread_key_t workspace_keyの初期化
* 確保したメモリ領域の開放はfree_workspace()@concurrent_skiplist.hで行う。
*/
if (pthread_key_create(&sl->workspace_key, (void *) free_workspace) != 0) {
elog("pthread_key_create() error");
abort();
}
return sl;
end:
free(sl->head);
free(sl);
return NULL;
}
ノードの検索各種操作(add,delete,find)が使うノードの検索関数を示す。
/*
*static int search(skiplist_t * sl, const lkey_t key, skiplist_node_t ** preds,
* skiplist_node_t ** succs)
*
* keyを持つノードを探す。
* ノードが見つかった場合:
* そのノードのレベル(高さ)を返す。またsuccsには目的のノードが保持するポインタ、
* predsにはsuccsを指すノードへのポインタを返す。
* 例) 以下のスキップリストでkey[4]のノードを探す場合、
*
*level: head tail
* 3 [-inf]------>[2]--------------------->[inf]
* 2 [-inf]------>[2]------>[4]------>[7]->[inf]
* 1 [-inf]->[1]->[2]------>[4]->[5]->[7]->[inf]
* 0 [-inf]->[1]->[2]->[3]->[4]->[5]->[7]->[inf]
*
* predsとsuccsはそれぞれ次のようになり、レベル2を返す。
* level: preds succs
* 2 [2] [7]
* 1 [2] [5]
* 0 [3] [5]
* preds[2]=2は"レベル2では、ノード(key=2)がノード(key=4)を指している"、
* preds[1]=2は"レベル1では、ノード(key=2)がノード(key=4)を指している"、
* preds[0]=3は"レベル0では、ノード(key=3)がノード(key=4)を指している"、
* を意味し、
* succs[2]=7は"レベル2では、ノード(key=4)がノード(key=7)を指している"、
* succs[1]=5は"レベル1では、ノード(key=4)がノード(key=5)を指している"、
* succs[0]=5は"レベル0では、ノード(key=4)がノード(key=5)を指している"、
* を意味する。
*/
static int search(const skiplist_t * sl, const lkey_t key, skiplist_node_t ** preds,
skiplist_node_t ** succs)
{
int lFound, level;
skiplist_node_t *pred, *curr;
pred = sl->head;
lFound = -1;
for (level = sl->maxLevel - 1; level >= 0; level--) {
curr = pred->next[level];
while (key > curr->key) {
pred = curr;
curr = pred->next[level];
}
if (lFound == -1 && key == curr->key)
lFound = level;
preds[level] = pred;
succs[level] = curr;
}
return lFound;
}
ノードの追加
/*
* bool_t add(list_t * list, const lkey_t key, const val_t val)
*
* listにnode(key,val)を追加する。
* 追加位置はkeyの昇順に従う。すでに同じkeyを持つnodeがある場合は失敗。
*
* 成功すればtrue、失敗すればfalseを返す。
*/
bool_t add(skiplist_t * sl, lkey_t key, val_t val)
{
workspace_t *ws = get_workspace(sl); /* スレッド毎に確保された補助メモリ領域wsへのポインタを返す */
assert(ws != NULL);
return _add(sl, ws->preds, ws->succs, key, val);
}
static bool_t
_add(skiplist_t * sl, skiplist_node_t ** preds, skiplist_node_t ** succs,
const lkey_t key, const val_t val)
{
int topLevel, highestLocked, lFound, level;
skiplist_node_t *newNode, *pred, *succ, *nodeFound;
bool_t valid;
int r = rand();
topLevel = (r % sl->maxLevel);
assert(0 <= topLevel && topLevel < sl->maxLevel);
/*
* step 1: ノードの検索
*/
while (1) {
if ((lFound = search(sl, key, preds, succs)) != -1) {
nodeFound = succs[lFound];
if (nodeFound->marked != true) {
while (nodeFound->fullyLinked != true) {};
return false;
}
continue;
}
/*
* step 2: preds[]に含まれるノードのロック
*/
highestLocked = -1;
valid = true;
for (level = 0; (valid == true) && (level <= topLevel); level++) {
pred = preds[level];
succ = succs[level];
lock(pred->mtx); /* levelによって複数回lockされる可能性がある -> RECURSIVE_MUTEX */
highestLocked = level;
valid = (pred->marked != true) && (succ->marked != true)
&& (pred->next[level] == succ);
}
if (valid != true) {
for (level = 0; level <= highestLocked; level++)
unlock(preds[level]->mtx);
continue;
}
assert(valid == true);
for (level = 0; (level <= topLevel); level++) {
pred = preds[level]; succ = succs[level];
assert(pred->marked != true); assert(succ->marked != true);
assert(pred->next[level] == succ);
}
/*
* step 3: ノード追加
*/
newNode = create_node(topLevel, key, val);
for (level = 0; level <= topLevel; level++) {
newNode->next[level] = succs[level];
preds[level]->next[level] = newNode;
}
newNode->fullyLinked = true;
break;
}
/*
* step 4: ロックの解除
*/
assert(highestLocked == topLevel);
for (level = 0; level <= topLevel; level++)
unlock(preds[level]->mtx);
return true;
}
関数add()内部で呼ぶnode生成関数create_node()を示す。
/*
* static skiplist_node_t *create_node(const int topLevel, const lkey_t key, const val_t val)
*
* レベル(高さ)topLevelで(key, val)を持つnodeを生成する。
*
* 成功すればnodeへのポインタを返す。失敗すればNULLを返す。
*/
#define node_size(level) (sizeof(skiplist_node_t) + ((level + 1) * sizeof(skiplist_node_t *)))
static skiplist_node_t *create_node(const int topLevel, const lkey_t key, const val_t val)
{
skiplist_node_t *node;
#ifndef PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP
pthread_mutexattr_t mtx_attr;
#endif
if ((node = (skiplist_node_t *) calloc(1, node_size(topLevel))) == NULL) {
elog("calloc error");
return NULL;
}
node->key = key;
node->val = val;
node->topLevel = topLevel;
#ifdef PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP
node->mtx = (pthread_mutex_t) PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP;
#else
pthread_mutexattr_init(&mtx_attr);
pthread_mutexattr_settype(&mtx_attr, PTHREAD_MUTEX_RECURSIVE_NP);
pthread_mutex_init(&node->mtx, &mtx_attr);
#endif
node->marked = false;
node->fullyLinked = false;
return node;
}
nodeの削除
/*
* bool_t delete(skiplist_t * sl, lkey_t key, val_t * val)
*
* listからkeyを持つnodeを削除し、そのnodeのvalを*valに書き込む。
* keyが存在しない場合は失敗。
*
* 成功すればtrue、失敗すればfalseを返す。
*/
bool_t delete(skiplist_t * sl, const lkey_t key, val_t * val)
{
workspace_t *ws = get_workspace(sl);
assert(ws != NULL);
return _delete(sl, ws->preds, ws->succs, key, val);
}
static bool_t
_delete(skiplist_t * sl, skiplist_node_t ** preds, skiplist_node_t ** succs,
const lkey_t key, val_t * val)
{
int topLevel = -1;
int i, lFound, level, highestLocked;
skiplist_node_t *pred, *victim;
bool_t isMarked = false;
bool_t valid, flag;
while (1) {
/*
* step 1: 削除するノードの検索
*/
victim = NULL;
if ((lFound = search(sl, key, preds, succs)) == -1) /* 削除するノードが存在しない場合 */
return false;
victim = succs[lFound];
flag = ((victim->fullyLinked == true) && (victim->topLevel == lFound) && (victim->marked != true));
/* 初回、もしくはのみ実行
* => victim->marked = true; isMarked = true; topLevel = victim->topLevel;
*/
if (isMarked == true || ((lFound != -1) && flag)) {
if (isMarked != true) {
topLevel = victim->topLevel;
lock(victim->mtx);
if (victim->marked == true) {
unlock(victim->mtx);
return false;
}
victim->marked = true;
isMarked = true;
}
/*
* step 2: preds[]のロック獲得
*/
highestLocked = -1;
valid = true;
for (level = 0; (valid == true) && (level <= topLevel); level++) {
pred = preds[level];
lock(pred->mtx);
highestLocked = level;
valid = (pred->marked != true) && (pred->next[level] == victim);
}
if (valid != true) { /* preds[]のロック獲得に失敗した場合、やりなおし */
for (i = 0; i <= highestLocked; i++) unlock(preds[i]->mtx);
continue;
}
/*
* step 3: ノードの削除
*/
for (level = victim->topLevel; level >= 0; level--)
preds[level]->next[level] = victim->next[level];
unlock(victim->mtx);
for (i = 0; i <= lFound; i++) /* preds[]のロック開放 */
unlock(preds[i]->mtx);
break;
}
else
return false;
}
*val = victim->val;
free_node(victim);
return true;
}
nodeの検索
/*
* val_t find(skiplist_t * sl, lkey_t key)
*
* skiplistからkeyを持つノードを検索する。ノードが存在する場合はそのノードのvalを返す。
* keyが存在しない場合はNULLを返す。
*
*/
val_t find(skiplist_t * sl, lkey_t key)
{
workspace_t *ws = get_workspace(sl);
assert(ws != NULL);
return _find(sl, ws->preds, ws->succs, key);
}
static bool_t _find(skiplist_t * sl, skiplist_node_t ** preds,
skiplist_node_t ** succs, const lkey_t key)
{
int lFound;
lFound = search(sl, key, preds, succs);
if (lFound == -1)
return (val_t) NULL;
if (succs[lFound]->fullyLinked != true || succs[lFound]->marked == true)
return (val_t) NULL;
return preds[0]->next[0]->val;
}
実行ソースはGitHubに移行しました。
Last-modified: 2014-7-6
|