UnBounded Queue: 並行処理編 Part 3

ソースはGitHubに移行しました。

LL/SC based Lock-Free Queue

LL/SCを利用したLock-Free Queueの実装。
原論文は"Bringing practical lock-free synchronization to 64-bit applications"
このアルゴリズムもよい特徴を備えている。

  • スレッド数に制限がないこと(予めスレッド数の上限を設定しなくてよい)
  • 使われるメモリ領域はqueueの要素数に比例し、各スレッドも8バイト(X86環境)か16バイト(X86_64環境)程度しか使わない


  • LL/SCといってもIntelのCPUはサポートしていないので、CAS(cmpxchg)でエミュレートしている。 汎用的なエミュレートでなく、あくまでQueueの実装でのみ有効な方式に見える(まだしっかり把握できていないので要調査)。
    LL/SCはABA問題が生じないので、 素直にアドレス値に対してCAS命令が使えるため、この実装はintel64(X86_64)環境でも動作する。

    ちなみに、 lucille development blogでも同じ実装が公開されている。 「見たら負けかな」と思ったので実装を終えるまでソースをよく見なかったが、改めて眺めてみるとほとんど同じである。 これは論文の疑似コードが 限りなくC言語に近いため。しかし、関数の並びまで同じなのは笑った。

    ソースはGitHubに移行しました。

    データ構造

    データ構造は完全に論文通り。唯一、スレッド毎のデータ領域確保のためpthread_keyを定義している。

    LLSCLockFreeQueue.h:
    typedef struct _ExitTag {
      int count;
      int transfersLeft;
      bool_t nlP;
      bool_t toBeFreed;
    } ExitTag;
    
    typedef struct _node_t {
      val_t val;
    
      struct _node_t *next;
      struct _node_t *pred;
      ExitTag exit;
    } node_t;
    
    typedef struct _EntryTag {
      int ver;
      int count;
    } EntryTag;
    
    
    typedef struct _LLSCvar {
      node_t *ptr0;
      node_t *ptr1;
      EntryTag entry;
    } LLSCvar;
    
    
    typedef struct _queue_t {
      LLSCvar head;
      LLSCvar tail;
      pthread_key_t workspace_key;
    } queue_t;
    
    
    typedef struct _workspace_t {
      int myver;
      node_t *mynode __attribute__((aligned(16)));
    } workspace_t __attribute__((aligned(16)));
    
    

    基本関数

    Lock-Freeのためのプリミティブ

    CAS(cmpxchg系)でLL/SCをエミュレートする。詳細は論文参照。 今はソースを書き散らかしているだけだが、いずれ内容を噛み砕いて解説したいと思っている。

    #ifdef __X86_64__
    static inline bool_t cas(void *ptr, uint64_t oldv, uint64_t newv)
    {
      uint64_t result;
      __asm__ __volatile__("lock\n cmpxchgq %1,%2"
                           : "=a" (result)
                           : "q" (newv), "m" (*(uint64_t *)ptr),"0" (oldv)
                           : "memory");
      return ((result == oldv) ? true : false);
    }
    #define CAST(value)   (*((uint64_t *)&(value)))
    
    #else
    static inline bool_t cas(void *ptr, uint32_t oldv, uint32_t newv)
    {
      uint32_t result;
      __asm__ __volatile__("lock\n cmpxchgl %1,%2"
                           : "=a" (result)
                           : "q" (newv), "m" (*(uint32_t *)ptr),"0" (oldv)
                           : "memory");
      return ((result == oldv) ? true : false);
    }
    
    #define CAST(value)   (*((uint32_t *)&(value)))
    #endif
    
    
    #define CURRENT(loc, ver)  (ver % 2 == 0 ? loc->ptr0 : loc->ptr1)
    #define NONCURADDR(loc, ver)  (ver % 2 == 0 ? (void *)&(loc->ptr1) : (void *)&(loc->ptr0))
    #define CLEAN(exit)     ((exit.count == 0) && (exit.transfersLeft == 0))
    #define FREEABLE(exit)  (CLEAN(exit) &&  exit.nlP && exit.toBeFreed)
    
    
    static node_t 
    *LL(LLSCvar *loc, int *myver, node_t **mynode)
    {
      EntryTag e, new;
      do {
        e = loc->entry;
        *myver = e.ver;
        *mynode = CURRENT(loc, e.ver);
        {
          new.ver = e.ver;
          new.count = e.count + 1;
        }
      } while (!cas(&loc->entry, CAST(e), CAST(new)));
    
      return *mynode;
    }
    
    static bool_t
    SC(LLSCvar *loc, node_t *nd, int myver, node_t *mynode)
    {
      EntryTag e, new;
      node_t *pred_nd = mynode->pred;
      bool_t success = cas(NONCURADDR(loc, myver), CAST(pred_nd), CAST(nd));
    
      /****
      if (!success)
        free(new_nd);
      ***/
    
      e = loc->entry;
      while (e.ver == myver) {
        {
          new.ver = e.ver + 1;
          new.count = 0;
        }
        if (cas(&loc->entry, CAST(e), CAST(new)))
          transfer(mynode, e.count);
        e = loc->entry;
      }
      release(mynode);
      return success;
    }
    
    static void 
    transfer(node_t *nd, int count)
    {
      ExitTag pre, post;
      do {
        pre = nd->exit;
        {
          post.count = pre.count + count;
          post.transfersLeft = pre.transfersLeft - 1;
          post.nlP = pre.nlP;
          post.toBeFreed = pre.toBeFreed;
        }
      } while (!cas(&nd->exit, CAST(pre), CAST(post)));
    }
    
    
    static void 
    release(node_t *nd)
    {
      ExitTag pre, post;
      node_t *pred_nd = nd->pred;
    
      do {
        pre = nd->exit;
        {
          post.count = pre.count - 1;
          post.transfersLeft = pre.transfersLeft;
          post.nlP = pre.nlP;
          post.toBeFreed = pre.toBeFreed;
        }
      } while (!cas(&nd->exit, CAST(pre), CAST(post)));
      
      if (CLEAN(post))
        setNLPred(pred_nd);
    
      if (FREEABLE(post))
        free(nd);
    }
    
    
    static void 
    unlink(LLSCvar *loc, int myver, node_t *mynode)
    {
      EntryTag e, new;
      do {
        e = loc->entry;
      } while (e.ver == myver);
    
      {
        new.ver = e.ver;
        new.count = e.count - 1;
      }
      if (!cas(&loc->entry, CAST(e), CAST(new)))
        release(mynode);
    }
    
    static void 
    setNLPred(node_t *pred_nd)
    {
      ExitTag pre, post;
        do {
        pre = pred_nd->exit;
        {
          post.count = pre.count;
          post.transfersLeft = pre.transfersLeft;
          post.nlP = true;
          post.toBeFreed = pre.toBeFreed;
        }
        } while (!cas(&pred_nd->exit, CAST(pre), CAST(post)));
      if (FREEABLE(post))
        free(pred_nd);
    }
    
    static void
    setToBeFreed(node_t *pred_nd) 
    {
      ExitTag pre, post;
    
      do {
        pre = pred_nd->exit;
        {
          post.count = pre.count;
          post.transfersLeft = pre.transfersLeft;
          post.nlP = pre.nlP;
          post.toBeFreed = true;
        }
      } while (!cas(&pred_nd->exit, CAST(pre), CAST(post)));
      
      if (FREEABLE(post)) 
        free(pred_nd);
    }
    
    queueの生成
    queue_t *init_queue (void)
    {
      queue_t *q;
    
      if ((q = (queue_t *) calloc(1, sizeof(queue_t))) == NULL) {
        elog("calloc error");
        return NULL;
      }
    
      q->tail.entry.ver = 0;
      q->tail.entry.count = 0;
      
      if ((q->tail.ptr0 = (node_t *)calloc(1, sizeof(node_t))) == NULL) {
        elog("calloc error");
        goto end;
      }
      if ((q->tail.ptr1 = (node_t *)calloc(1, sizeof(node_t))) == NULL) {
        elog("calloc error");
        goto end;
      }
      
      q->tail.ptr0->pred = q->tail.ptr1;
      
      q->tail.ptr0->exit.count = 0;
      q->tail.ptr0->exit.transfersLeft = 2;
      q->tail.ptr0->exit.nlP = 0;
      q->tail.ptr0->exit.toBeFreed = 0;
    
      q->tail.ptr0->next = NULL;
      
      q->tail.ptr1->exit.count = 0;
      q->tail.ptr1->exit.transfersLeft = 0;
      q->tail.ptr1->exit.nlP = 0;
      q->tail.ptr1->exit.toBeFreed = 0;
      
      q->head = q->tail;
    
      if (pthread_key_create(&q->workspace_key, (void *) free_workspace) != 0) {
        elog("pthread_key_create() error");
        abort();
      }
    
      return q;
    
     end:
      free(q->tail.ptr0);
      free(q);
      return NULL;
    }
    
    void
    free_queue(queue_t *q)
    {
      free(q);
    }
    
    enqueue
    bool_t enq(queue_t *q, val_t val)
    {
      bool_t ret = true;
      node_t *nd, *tail;
      workspace_t *ws = get_workspace(q);
      assert(ws != NULL);
    
      if ((nd = create_node(val)) == NULL) {
        return false;
      }
    
      while (1) {
        tail = LL(&q->tail, &ws->myver, &ws->mynode);
    
        nd->pred = tail;
        if (cas(&tail->next, (uintptr_t)NULL, CAST(nd))) {
          SC(&q->tail, nd, ws->myver, ws->mynode);
          break;
        } 
        else {
          SC(&q->tail, tail->next, ws->myver, ws->mynode);
        }
      }
      return ret;
    }
    
    static node_t *create_node (val_t val)
    {
      node_t *node;
      if ((node = (node_t*)calloc(1, sizeof(node_t))) == NULL) {
        elog("calloc error");
        return NULL;
      }
      node->val = val;
      node->next = NULL;
      node->exit.count = 0;
      node->exit.transfersLeft = 2;
      node->exit.nlP = false;
      node->exit.toBeFreed = false;
    
      return node;
    }
    
    dequeue
    bool_t deq(queue_t *q, val_t *val) 
    {
      bool_t ret = true;
      node_t *head, *next;
      workspace_t *ws = get_workspace(q);
      assert(ws != NULL);
      
      while (1) {
        head = LL(&q->head, &ws->myver, &ws->mynode);
        next = head->next;
        if (next == NULL) {
          unlink(&q->head, ws->myver, ws->mynode);
          *val = (val_t)NULL;
          ret = false;
          break;
        }
        
        if (SC(&q->head, next, ws->myver, ws->mynode)) {
          *val = next->val;
          setToBeFreed(next);
          free_node(next);
          break;
        }
      }
      return ret;
    }
    

    実行

    ソースはGitHubに移行しました。



    Last-modified: 2014-7-6