|
|
|
単方向リスト: 並行処理編 Part 3ソースはGitHubに移行しました。 Non-Blocking Singly-linked List(Non-Blocking単方向リスト)Non-Blocking Slingly-Linked Listについて、ここではCASベースの比較的単純なアルゴリズムを実装した。 "A Pragmatic Implementation of Non-Blocking Linked-Lists" Timothy L. Harris .pdf アルゴリズムを簡単に図示する: ![]() 図の下部は今回実装したアルゴリズムで、削除するノードにマーキングして上記の状況を避ける。 ソースはGitHubに移行しました。 データ構造データ構造list_tとnode_tを定義する。 list_tはListの本体、node_tはListに連結するnodeのデータ構造である。
nodeとlistのデータ構造を示す。
NonBlockingList.h:
typedef enum {UNMARKED = 0, MARKED = 1} node_stat;
typedef struct _next_ref {
node_stat mark;
void *node_ptr;
}__attribute__((packed)) next_ref;
typedef struct _node_t
{
lkey_t key; /* key */
val_t val; /* 値 */
next_ref next; /* 次のnodeへのリファレンス */
}__attribute__((packed)) node_t;
typedef struct _list_t
{
node_t *head;
node_t *tail;
} list_t;
list_tには、リストの先頭headと末尾tailが確保される。 基本関数listの生成
/*
* list_t *init_list(void)
*
* listを生成する。
*
* 成功すればlistへのポインタを返す。失敗ならNULLを返す。
*
*/
list_t *init_list(void)
{
list_t *list;
if ((list = (list_t *) calloc(1, sizeof(list_t))) == NULL) {
elog("calloc error");
return NULL;
}
if ((list->head = (node_t *) calloc(1, sizeof(node_t))) == NULL) {
elog("calloc error");
goto end;
}
if ((list->tail = (node_t *) calloc(1, sizeof(node_t))) == NULL) {
elog("calloc error");
goto end;
}
list->head->key = INT_MIN;
list->head->next = make_ref(list->tail, UNMARKED);
list->tail->key = INT_MAX;
list->tail->next = make_ref(NULL, UNMARKED);
return list;
end:
free (list->head);
free (list);
return NULL;
}
ノードの検索
static inline bool_t
cas64(volatile next_ref * addr, const next_ref oldp, const next_ref newp)
{
char result;
__asm__ __volatile__("lock; cmpxchg8b %0; setz %1":"=m"(*(volatile next_ref *)addr),
"=q"(result)
:"m"(*(volatile next_ref *)addr), "a"(oldp.mark), "d"(oldp.node_ptr),
"b"(newp.mark), "c"(newp.node_ptr)
:"memory");
return ((int)result == true ? true : false);
}
#define is_marked_ref(ref) (ref.mark == MARKED ? true:false)
#define is_unmarked_ref(ref) (ref.mark == UNMARKED ? true:false)
#define get_ptr(ref) (ref.node_ptr)
static next_ref make_ref(node_t * node_ptr, const node_stat mark)
{
next_ref ref;
ref.mark = mark;
ref.node_ptr = (node_t *) node_ptr;
return ref;
}
#define get_unmarked_ref(ptr) make_ref(ptr, UNMARKED)
#define get_marked_ref(ptr) make_ref(ptr, MARKED)
static node_t *search(list_t * list, const lkey_t key, node_t **pred)
{
node_t *pred_next = NULL;
node_t *curr;
search_again:
do {
node_t *t = list->head;
node_t *t_next = (node_t *)get_ptr(list->head->next);
/* step 1: find pred and curr */
do {
if (!is_marked_ref(t_next->next)) {
(*pred) = t;
pred_next = t_next;
}
t = t_next; t->next.mark = (int)UNMARKED;
if (t == list->tail)
break;
t_next = (node_t *)get_ptr(t->next);
} while (t->key < key || is_marked_ref(t_next->next));
assert(key <= t->key || is_unmarked_ref(t_next->next));
curr = t;
/* step 2: check nodes are adjacent */
if (pred_next == curr) {
if ((curr != list->tail) && (is_marked_ref(curr->next))) {
goto search_again;
} else {
assert (pred_next == curr);
return curr;
}
}
/* step 3: remove one or more marked nodes */
if (cas64(&(*pred)->next, make_ref(pred_next, MARKED), make_ref(curr, UNMARKED)) == true) {
if (curr != list->tail && is_marked_ref(curr->next)) {
goto search_again;
} else {
assert (get_ptr((*pred)->next) == curr);
return curr;
goto end; // dummy
}
}
}
while (1);
end:
assert (get_ptr((*pred)->next) == curr);
return curr;
}
ノードの追加
/*
* bool_t add(list_t * list, const lkey_t key, const val_t val)
*
* listにnode(key,val)を追加する。
* 追加位置はkeyの昇順に従う。すでに同じkeyを持つnodeがある場合は失敗。
*
* 成功すればtrue、失敗すればfalseを返す。
*/
bool add(list_t * list, const lkey_t key, const val_t val)
{
node_t *pred = NULL;
node_t *curr;
node_t *newNode;
if ((newNode = create_node(key, val)) == NULL)
return false;
do {
curr = search(list, key, &pred);
assert(pred->key < key && key <= curr->key);
if ((curr != list->tail) && (curr->key == key)) {
free_node(newNode);
return false;
}
newNode->next = make_ref(curr, UNMARKED);
if (cas64(&pred->next, make_ref(curr, UNMARKED), make_ref(newNode, UNMARKED)) == true) {
break;
}
}
while (1);
return true;
}
関数add()内部で呼ぶnode生成関数create_node()を示す。
/*
* node_t *create_node(const lkey_t key, const val_t val)
*
* (key, val)を持つnodeを生成する。
*
* 成功すればnodeへのポインタを返す。失敗すればNULLを返す。
*/
static node_t *create_node(const lkey_t key, const val_t val)
{
node_t *node;
if ((node = (node_t *) calloc(1, sizeof(node_t))) == NULL) {
elog("calloc error");
return NULL;
}
node->next = make_ref(NULL, UNMARKED);
node->key = key;
node->val = val;
return node;
}
ノードの削除ノードの削除も基本戦略は追加と同じである。
/*
* bool_t delete(list_t * list, const lkey_t key, val_t *val)
*
* listからkeyを持つnodeを削除し、そのnodeのvalを*valに書き込む。
* keyが存在しない場合は失敗。
*
* 成功すればtrue、失敗すればfalseを返す。
*/
bool_t delete(list_t * list, const lkey_t key, val_t *val)
{
node_t *pred = NULL;
node_t *curr, *curr_next;
bool_t ret = true;
do {
curr = search(list, key, &pred);
assert(pred->key < key && key <= curr->key);
if ((curr == list->tail) || (curr->key != key)) {
#ifdef _DEBUG_
if (curr->key != key)
printf ("delete ERROR: key = %ld curr->key %ld\n", (long int)key, (long int)curr->key);
#endif
return false;
}
curr_next = (node_t *)get_ptr(curr->next);
if (is_unmarked_ref(curr_next->next)) {
node_t *node_ptr = (node_t *)get_ptr(curr->next);
if (cas64(&curr->next, make_ref(node_ptr, curr->next.mark), get_marked_ref(node_ptr)) == true)
break;
}
}
while (1);
if (cas64(&pred->next, get_unmarked_ref(curr), get_unmarked_ref(curr_next)) != true) {
curr = search(list, curr->key, &pred);
}
*val = curr->val;
free_node(curr);
return ret;
}
ノードの検索
/*
* bool_t find(list_t * list, const lkey_t key, val_t *val)
*
* listからkeyを持つnodeを検索し、そのnodeのvalを*valに書き込む。
* keyが存在しない場合は失敗。
*
* 成功すればtrue、失敗すればfalseを返す。
*/
bool find(list_t * l, const lkey_t key)
{
node_t *curr;
node_t *pred = NULL;
curr = search(l, key, &pred);
if ((curr == l->tail) || (curr->key != key))
return false;
return true;
}
実行ソースはGitHubに移行しました。
Last-modified: 2014-7-6
|