无锁编程

所谓无锁编程,就是用来编写不显示使用锁的并发程序技术。程序员转而依靠由硬件支持的原语操作来避免数据竞争。这里的原语操作在C++标准库里值得就是使用原子类型的原子操作,同时相较传统的原语操作,C++标准提供的原子操作是可移植的。原子操作的背后当然是由C++提供的内存模型支持,详细可我之前的博客

本文主要介绍如何利用C++标准提供的原子类型进行无锁编程,实现两个常用的数据结构,它们不仅是线程安全的,而且较有锁编程,无锁编程最大限度提升并发度,这主要得益于无锁编程:

  • 非阻塞,甚至可能是免等的。总是存在某个线程能执行下一步操作;

  • 利用特定内存次序做特定的操作。

无锁编程的另外一个优点是代码健壮性,因为较有锁编程,无锁编程代码即便在某线程意外终止,也仅仅影响其持有的数据对其他数据无影响。

然而,无锁编程也有许多缺点:

  • 编写难度大。必须格外谨慎;

  • 能够避免死锁,但是可能出现活锁,即线程间执行到某一处都需要更改同一数据,于是反复循环,又同时到达该处,如此反复。这种情况非常少见,但仍然可能降低代码效率;

  • 同步数据时可能缓存乒乓。即多个独立CPU核心频繁轮流读写同一数据,使各个CPU所属的缓存不断地切如、切出(回写),导致严重性能问题。

下面利用无锁编程实现一个栈和一个队列,尽量体现无锁编程的优点,避免其缺点。

实现线程安全的栈

用无锁编程实现栈需要用到特别多的trick:

  • 使用compare_and_exchange。这没得说,使用原子类型的compare_and_exchange函数检查是否有其他线程正在修改操作的对象,如果修改了,则改变为修改的值,再进行比较,直到没有修改,就给当前对象赋值;

  • 使用内外两个引用计数。非常巧妙的内存管理方式。在弹栈时,最大的困难就是防止内存泄露,因为即便弹出栈了,我们也不知道还有没有其他线程在读取该节点,也就不知道何时该删除次节点。因此必须自己实现一个类似垃圾回收的机制。

  • 内存次序。为了最大限度提升并发,在完成所有功能后,我们要尽可能地放宽原子操作的内存次序,当然前提是保证代码功能符合预期。这就需要对代码间的关系进行分析。

实现线程安全的队列

用无锁编程实现队列和上面实现锁的方式类似,关键也是使用两个引用计数来做内存管理,代码如下:


//
// an implementation of a lock free queue. It is similar to the implementation of the lock free stack. Both of them use
// external and internal reference count to manage memory.
//

#include <atomic>
#include <memory>

template<typename T>
class LockFreeQueue {
private:
    struct Node;
    struct CountedNodePtr {
        int externalCount;
        Node* ptr;
    };
    std::atomic<CountedNodePtr> head;
    std::atomic<CountedNodePtr> tail;

    struct NodeCounter {
        unsigned internalCount:30;
        unsigned externalCounters:2;
    };
    struct Node {

        // decrease reference count
        void releaseRef() {
            NodeCounter oldCounter = count.load(std::memory_order_relaxed);
            NodeCounter newCounter;
            do {
                newCounter = oldCounter;
                --newCounter.internalCount;
            } while (!count.compare_exchange_strong(oldCounter, newCounter, std::memory_order_acquire, std::memory_order_relaxed));
            if (!newCounter.internalCount && !newCounter.externalCounters) {
                delete this;
            }
        }

        std::atomic<T*> data;
        std::atomic<NodeCounter> count;
        CountedNodePtr next;
        Node() {
            NodeCounter newCount;
            newCount.internalCount = 0;
            newCount.externalCounters = 2;
            count.store(newCount);
            next.ptr = nullptr;
            next.externalCount = 0;
        }
    };

    // increase reference count of counter
    static void increaseExternalCount(std::atomic<CountedNodePtr>& counter, CountedNodePtr& oldCounter) {
        CountedNodePtr newCounter;
        do {
            newCounter = oldCounter;
            ++newCounter.externalCount;
        } while (!counter.compare_exchange_strong(oldCounter, newCounter, std::memory_order_acquire, std::memory_order_relaxed));
        oldCounter.externalCount = newCounter.externalCount;
    }

    // add external reference count to internal, delete external counter if it decreases to zero.
    static void freeExternalCounter(CountedNodePtr& oldNodePtr) {
        Node* const ptr = oldNodePtr.ptr;
        int const countIncrease = oldNodePtr.externalCount - 2;
        NodeCounter oldCounter = ptr->count.load(std::memory_order_relaxed);
        NodeCounter newCounter;
        do {
            newCounter = oldCounter;
            --newCounter.externalCounters;
            newCounter.internalCount += countIncrease;
        } while (!ptr->count.compare_exchange_strong(oldCounter, newCounter, std::memory_order_acquire, std::memory_order_relaxed));
        if (!newCounter.internalCount && !newCounter.externalCounters) {
            delete ptr;
        }
    }

public:
    LockFreeQueue () {
        CountedNodePtr newNext; 
        newNext.ptr = new Node;
        newNext.externalCount = 1;
        head.store(newNext);
        tail.store(head);
    }

    void push (T newValue) {
        auto newData = std::make_unique<T>(newValue);
        CountedNodePtr newNext;
        newNext.ptr = new Node;
        newNext.externalCount = 1;
        CountedNodePtr oldTail = tail.load();
        for (;;) {
            increaseExternalCount(tail, oldTail);
            T* oldData = nullptr;

            // push data to the dummy tail node, the make newNext be new dummy tail
            if (oldTail.ptr->data.compare_exchange_strong(oldData, newData.get())) {
                oldTail.ptr->next = newNext;
                oldTail = tail.exchange(newNext);
                freeExternalCounter(oldTail);
                newData.release();
                break;
            }
            oldTail.ptr->releaseRef();
        }
    }

    std::unique_ptr<T> pop() {
        CountedNodePtr oldHead = head.load(std::memory_order_relaxed);
        for (;;) {
            increaseExternalCount(head, oldHead);
            Node* const ptr = oldHead.ptr;

            // this means queue has only a dummy tail. It is empty.
            if (ptr == tail.load().ptr) {
                ptr->releaseRef();
                return std::unique_ptr<T>();
            }

            // exchange head to head->next
            if (head.compare_exchange_strong(oldHead, ptr->next)) {
                T* const res = ptr->data.exchange(nullptr);
                freeExternalCounter(oldHead);
                return std::unique_ptr<T>(res);
            }
            ptr->releaseRef();
        }
    }
};

参考文献