Pages

Monday, August 13, 2012

Lock Free Work Stealing Queue

Lock Free Work Stealing Queue


Lock free data structures are very handy for concurrent programs especially that the number of cores increases. The benefits of such data structures is that they never acquire a lock, instead they do a form of a spin lock, where a local copy of the data must be stored and exchanged or updated by an processors atomic operation (like "compare and swap" or "fetch and add") only when the global value and the local copy match, otherwise we repeat the whole procedure. Almost every lock free data structure follows this pattern, so you may ask where's the benefit in that? Well the chances are that the local data race will never issue a spin and everything will end in just one loop cycle, thus making the process very fast, instead of always acquiring and holding the lock, where other threads would just queue up. 

These types of data structures are very hard to implement correctly as we need to think about other threads, where they are and what they are doing, and to be even more hard there types of structures can suffer from the ABA problem where a memory location will be read twice but between reads some other thread might introduced sideffects by changing the values in that location (resusing location) and thus corrupting the entire state, but the switched out thread will just hold the memory address thus thinking that the data is still the same. Luckily for us in .NET the ABA problem does not exist as the memory location will only get reused when there are no references to it.

Work Stealing

So where does work stealing come into play here? Well a work stealing queue is simply a queue where the main thread can dequeue from the head but the stealing threads can dequeue elements from the tail, in a standard locking queue this helps alot as the stealing thread does not participate in the lock when dequeueing if the head and tail are far away from each other, if they are close then a lock can be put for extra safety. So almost by design this queue is supposed to be almost lock free. But what it gives us when our queue will not use any locks at all and will just do a form of a spin lock ? Well there will be almost the same benefits that we will not participate head dequeing making it a lot less likely that our operation will last more then one loop cycle. There is however one difference as we may slow down the enqueing operation as we will be racing with the enqueing threads, but the enqueue thread will not slow our own stealing operation.

So lets go into code, again most of explanations of how it's working is done through comments in the code, as I just feel that's the better way :)

/// <summary>
/// Represents a hybrid queue, that uses non blocking (spining) techniques to achive thread safety.
/// </summary>
/// <remarks>
/// The queue is hybrid due it's functionality that it can dequeue last node, which makes it
/// perfect for certain set of alghoritms, like work stealing.
/// </remarks>
/// <typeparam name="T">generic Typeparam.</typeparam>
public class LockFreeWorkStealingQueue<T> : IEnumerable<T>
{
    /// <summary>
    /// Internal node class for the use of internal double linked list structure.
    /// </summary>
    private class Node
    {
        public T val;
        public volatile Node next;
        public volatile Node prev;
        public int id;
    }

    /*
    * You may ask yourself why volatile is here, well the
    * main reason for this when we don't do explicit locking
    * we don't get the memory barier safety so instructions
    * might get reordered.
    *
    * NOTE: having volatile code in here is just a workaround
    * as we get a performance hit, instead we need to put mem bariers
    * when they are actually needed!
    */
    private volatile Node head;
    private volatile Node tail;

    /// <summary>
    /// Initializes a new instance of a hybrid queue.
    /// </summary>
    public LockFreeWorkStealingQueue()
    {
        head = new Node();
        tail = new Node();
        head = tail;
    }

    /// <summary>
    /// Gets the Unsafe Count (A count that will not nesserly provide the correct actual value).
    /// </summary>
    /// <remarks>
    /// This property is very handy when trying to issue a steal depending on a certain window.
    /// </remarks>
    public int UnsafeCount
    {
        get
        {
            return tail.id - head.id;
        }
    }

    /// <summary>
    /// Gets the count.
    /// </summary>
    public int Count
    {
        get
        {
            int count = 0;
            EvaluateCount((x) => false, out count);
            return count;
        }
    }

    /// <summary>
    /// Stars counting nodes utils a certain condition has been met.
    /// </summary>
    /// <param name="value">the confiiton.</param>
    /// <returns>the value indication that the condition was met or not.</returns>
    public bool EvaluateCount(Predicate<int> value)
    {
        int count = 0;
        return EvaluateCount(value, out count);
    }

    /// <summary>
    /// Stars counting nodes utils a certain condition has been met.
    /// </summary>
    /// <param name="value">the confiiton.</param>
    /// <param name="actualCount">the actual counted number of elements.</param>
    /// <returns>the value indication that the condition was met or not.</returns>
    private bool EvaluateCount(Predicate<int> value, out int actualCount)
    {
        int count = 0;
        for (Node current = head.next;
            current != null; current = current.next)
        {
            count++;

            if (value(count))
            {
                actualCount = count;
                return true;
            }
        }
        actualCount = count;
        return false;
    }

    /// <summary>
    /// Get's the value indicating if the Queue is empty.
    /// </summary>
    public bool IsEmpty
    {
        get { return head.next == null; }
    }

    /// <summary>
    /// Get's the tail.
    /// </summary>
    /// <remarks>
    /// In order to achieve correctness we need to keep track of the tail,
    /// accessing tail.next will not do as some other thread might just moved it
    /// so in order to catch the tail we need to do a subtle form of a spin lock
    /// that will use CompareAndSet atomic instruction ( Interlocked.CompareExchange )
    /// and set ourselvs to the tail if it had been moved.
    /// </remarks>
    /// <returns>Tail.</returns>
    private Node GetTail()
    {
        Node localTail = tail;
        Node localNext = localTail.next;

        //if some other thread moved the tail we need to set to the right possition.
        while (localNext != null)
        {
            //set the tail.
            Interlocked.CompareExchange(ref tail, localNext, localTail);
            localTail = tail;
            localNext = localTail.next;
        }

        return tail;
    }

    /// <summary>
    /// Attempts to reset the Couner id.
    /// </summary>
    private void TryResetCounter()
    {
        if (tail.id >= Int16.MaxValue)
        {
            int res = (tail.id - head.id);
            head.id = 0;
            tail.id = res;
        }
    }

    /// <summary>
    /// Puts a new item on the Queue.
    /// </summary>
    /// <param name="obj">The value to be queued.</param>
    public void Enqueue(T obj)
    {
        Node localTail = null;
        Node newNode = new Node();
        newNode.val = obj;

        TryResetCounter();

        do
        {
            //get the tail.
            localTail = GetTail();

            //TODO: This should be atomic.
            newNode.next = localTail.next;
            newNode.id = localTail.id + 1;
            newNode.prev = localTail;
        }
        // if we arent null, then this means that some other
        // thread interffered with our plans (sic!) and we need to
        // start over.
        while (Interlocked.CompareExchange(
            ref localTail.next, newNode, null) != null);
        // if we finally are at the tail and we are the same,
        // then we switch the values to the new node, phew! :)
        Interlocked.CompareExchange(ref tail, newNode, localTail);
    }

    /// <summary>
    /// Gets the first element in the queue.
    /// </summary>
    /// <returns>Head element.</returns>
    public T Dequeue()
    {
        // keep spining until we catch the propper head.
        while (true)
        {
            Node localHead = head;
            Node localNext = localHead.next;
            Node localTail = tail;

            // if the queue is empty then return the default for that
            // typeparam.
            if (localNext == null)
            {
                return default(T);
            }
            else if (localHead == localTail)
            {
                // our tail is lagging behind so we need to swing it.
                Interlocked.CompareExchange(ref tail, localHead, localTail);
            }
            else
            {
                localNext.prev = localHead.prev;

                // if no other thread changed the head then we are good to
                // go and we can return the local value;
                if (Interlocked.CompareExchange(
                    ref head, localNext, localHead) == localHead)
                {
                    return localNext.val;
                }
            }
        }
    }

    /*
    * Instread of sexy name like 'Steal' I like to name it by it's function
    * meaning we are dequeing the LAST item! :P
    */

    /// <summary>
    /// Gets the last element in the queue.
    /// </summary>
    /// <returns>old tail element.</returns>
    public T DequeueLast()
    {
        Node localTail;
        Node localPrev;
        Node swapNode = new Node();

        do
        {
            //get the tail.
            localTail = GetTail();
            localPrev = localTail.prev;

            if (localPrev == null)
                return default(T);
            else if (localPrev.prev == null)
                return default(T);
            else if (localPrev.prev == head)
                return default(T);
            else if (localTail == null)
                return default(T);

            // Set the swap node values that will exchange the element
            // in a sense that it will skip right through it.
            swapNode.next = localTail.next;
            swapNode.prev = localPrev.prev;
            swapNode.val = localPrev.val;
            swapNode.id = localPrev.id;
        }
        // In order for this to be actualy *thread safe* we need to subscribe ourselfs
        // to the same logic as the enque and create a blockade by setting the next value
        // of the tail!
        while (Interlocked.CompareExchange(ref localTail.next, localTail, null) != null);


        // do a double exchange, if we get interrupted between we should be still fine as,
        // all we need to do after the first echange is to swing the prev element to point at the
        // correct tail.
        Interlocked.CompareExchange(ref tail, swapNode, tail);
        Interlocked.CompareExchange(ref tail.prev.next, swapNode, tail.prev.next);

        return localTail.val;
    }

    /// <summary>
    /// Tries to peek the next value in the queue without
    /// getting it out.
    /// </summary>
    /// <param name="value">the output value.</param>
    /// <returns>the value indicating that there are still values to be peeked.</returns>
    public bool TryPeek(out T value)
    {
        Node currentNode = head.next;

        if (currentNode == null)
        {
            value = default(T);
            return false;
        }
        else
        {
            value = currentNode.val;
            return true;
        }
    }

    /// <summary>
    /// Gets the enumerator.
    /// </summary>
    /// <returns>enumerator.</returns>
    public IEnumerator<T> GetEnumerator()
    {
        Node currenNode = head.next;
        Node localTail = GetTail();

        while (currenNode != null)
        {
            yield return currenNode.val;

            if (currenNode == localTail)
                break;

            currenNode = currenNode.next;
        }
    }

    /// <summary>
    /// Gets the enumerator.
    /// </summary>
    /// <returns>enumerator.</returns>
    IEnumerator IEnumerable.GetEnumerator()
    {
        return ((IEnumerable<T>)this).GetEnumerator();
    }
}

Performance

Lets do some perfomance tests for the solution, please note that those tests were conducted on a four physical core I7 system. The tests will only do a basic enqueue/dequeue test as it's impossible to campare performances of work stealing so that should be tested individually for performance depending on the use case of such structure. The test code simply inserts and removes from the queue from multiple threads at the same time, the initial count will give us the corretness indicator as it needs to be the same as when we started our program.

    static ManualResetEvent ev = new ManualResetEvent(false);
    static int enqTh = 100; //enqueue threads
    static int deqTh = 100; //dequeue threads
    static int items = 100000; //items do deq/enq
    static int count = 0; //countdown latch


    static void LockFreeTest()
    {
        Console.WriteLine("Lock Free Test");

        LockFreeWorkStealingQueue<int> q = new LockFreeWorkStealingQueue<int>();

        //insert some initial items.
        for (int i = 0; i < items * enqTh; i++ ) q.Enqueue(i);

        Stopwatch w = new Stopwatch();
        w.Start();

        for (int i = 0; i < enqTh; i++)
        {
            ThreadPool.QueueUserWorkItem((x) =>
                { for (int n = 0; n < items; n++) q.Enqueue(i); count++; if (count == enqTh + deqTh) ev.Set(); });
        }

        for (int i = 0; i < deqTh; i++)
        {
            ThreadPool.QueueUserWorkItem((x) =>
            { for (int n = 0; n < items; n++) q.Dequeue(); count++; if (count == enqTh + deqTh) ev.Set(); });
        }

        ev.WaitOne();
        w.Stop();

        Console.WriteLine("Count: {0}" ,q.Count);
        Console.WriteLine("Time it took: {0}", w.ElapsedMilliseconds);
    }

    static void LockingTest()
    {
        Console.WriteLine("Locking Test");

        Queue<int> q = new Queue<int>();

        //insert some initial items.
        for (int i = 0; i < items * enqTh; i++) q.Enqueue(i);

        Stopwatch w = new Stopwatch();
        w.Start();

        for (int i = 0; i < enqTh; i++)
        {
            ThreadPool.QueueUserWorkItem((x) =>
            { for (int n = 0; n < items; n++) lock(q) q.Enqueue(i); count++; if (count == enqTh + deqTh) ev.Set(); });
        }

        for (int i = 0; i < deqTh; i++)
        {
            ThreadPool.QueueUserWorkItem((x) =>
            { for (int n = 0; n < items; n++) lock(q) q.Dequeue(); count++; if (count == enqTh + deqTh) ev.Set(); });
        }

        ev.WaitOne();
        w.Stop();

        Console.WriteLine("Count: {0}", q.Count);
        Console.WriteLine("Time it took: {0}", w.ElapsedMilliseconds);
    }


Lock free queue took:



Standard queue took:


Conclusion

Now if those results are somewhat strange to you, let me explain. Performance benefits will only be visible on a high count core/processor system as only then processors will need to be in sync when waiting on a lock acquired by a thread on another processor, so for a low core systems you will be better off with standard locking (work stealing) queue.


Update On Performance

As correctly pointed out by Sławomir in the comments section, the performance test is actually wrong for a couple of reasons. The first being that ThreadPool uses work stealing algorithms itself so it will run on NumOfCores * 4 threads instead of the provided number, this is especially true if the tests were performed on .NET 4.0, besides the work stealing algorithm will obscure the results as threads will steal queued tasks. The correct solution to the problem is to use plain simple threads, so the code test code is changed to reflect that:

    static ManualResetEvent ev = new ManualResetEvent(false);
    static int enqTh = 1000; //enqueue threads
    static int deqTh = 1000; //dequeue threads
    static int enqItems = 10000;
    static int deqItems = 10000;
    static int count = 0; //countdown latch
    static int startItems = 10000000;
    static object locker = new object();

    static void LockFreeTest()
    {
        Console.WriteLine("Lock Free Test");

        LockFreeWorkStealingQueue<int> q = new LockFreeWorkStealingQueue<int>();

        //insert some initial items.
        for (int i = 0; i < startItems; i++) q.Enqueue(i);

        Thread[] enqThreads = new Thread[enqTh];
        Thread[] deqThreads = new Thread[deqTh];

        for (int i = 0; i < enqTh; i++)
        {
            enqThreads[i] = new Thread(() =>
                {
                    for (int n = 0; n < enqItems; n++) q.Enqueue(i); count++; if (count == enqTh + deqTh) ev.Set();
                });
        }

        for (int i = 0; i < deqTh; i++)
        {
            deqThreads[i] = new Thread(() =>
                {
                    for (int n = 0; n < deqItems; n++) q.Dequeue(); count++; if (count == enqTh + deqTh) ev.Set();
                });
        }

        Stopwatch w = new Stopwatch();
        w.Start();

        Array.ForEach(enqThreads, (x) => x.Start());
        Array.ForEach(deqThreads, (x) => x.Start());
        Array.ForEach(enqThreads, (x) => x.Join());
        Array.ForEach(deqThreads, (x) => x.Join());

        ev.WaitOne();
        w.Stop();

        Console.WriteLine("Count: {0}" ,q.Count);
        Console.WriteLine("Time it took: {0}", w.ElapsedMilliseconds);
    }

    static void LockingTest()
    {
        Console.WriteLine("Locking Test");

        Queue<int> q = new Queue<int>();

        //insert some initial items.
        for (int i = 0; i < startItems; i++) q.Enqueue(i);

        Thread[] enqThreads = new Thread[enqTh];
        Thread[] deqThreads = new Thread[deqTh];

        for (int i = 0; i < enqTh; i++)
        {
            enqThreads[i] = new Thread(() =>
            {
                for (int n = 0; n < enqItems; n++) lock (locker) q.Enqueue(i); count++; if (count == enqTh + deqTh) ev.Set();
            });
        }

        for (int i = 0; i < deqTh; i++)
        {
            deqThreads[i] = new Thread(() =>
            {
                for (int n = 0; n < deqItems; n++) lock(locker) q.Dequeue(); count++; if (count == enqTh + deqTh) ev.Set();
            });
        }

        Stopwatch w = new Stopwatch();
        w.Start();

        Array.ForEach(enqThreads, (x) => x.Start());
        Array.ForEach(deqThreads, (x) => x.Start());
        Array.ForEach(enqThreads, (x) => x.Join());
        Array.ForEach(deqThreads, (x) => x.Join());

        ev.WaitOne();
        w.Stop();

        Console.WriteLine("Count: {0}", q.Count);
        Console.WriteLine("Time it took: {0}", w.ElapsedMilliseconds);
    }

I extended the test procedure and also tested scenarios with high enq low deq ratio, and the other way around as it is rare for the most systems to have the same number of enq and deq at any given point in time, usually it will vary. I provided a table with results for given number of items and threads for given test scenario along with a graph showing the performance. Let's look at the performance of both solutions when we have the same amount of dequeue and enqueue threads:

For 1k and 10k items:

Threads 10 20 40 80 100 200 400 800 1000
LockFree 1000 142 210 337 743 999 1885 3304 3500 3594
Locking 1000 96 135 483 1037 1043 1628 2600 2638 2760
LockFree 10000 254 499 828 1907 2327 4922 9940 18637 23549
Locking 10000 257 481 890 2386 2888 5005 9505 17900 22900




As we can see the performance of the lock free solution on my machine (4 physical cores) is almost on par with the locking queue, and there is a pattern here where the lock free solution outperforms the locking code (by a hair) where the number of threads is between 40 and 100. For better results of the lock free code we would need to redo this test on multi core machine (8 to 10 physical cores), but unfortunately I have no access to such machine (sic!) :(. Now let's test the performance where we have varying number of enqueue and dequeue threads.

For 10k items and varying number of dequeue threads:


Threads 10d 10e 20d 10e 40d 10e 80d 10e 100d 10e 200d 10e 400d 10e 800d 10e 1000d 10e
Locking 10000 234 419 532 1210 1414 2364 4876 10257 13067
LockFree 10000 190 393 524 958 1181 2391 4528 9141 10878




For 10k items and varying number of enqueue threads:

Threads 10d 10e 10d 20e 10d 40e 10d 80e 10d 100e 10d 200e 10d 400e 10d 800e 10d 1000e
Locking 10000 201 429 797 1369 1408 2703 5063 9916 12917
LockFree 10000 168 416 587 1184 1300 2426 4867 10645 13267




Now we do see that the lock free queue generally outperforms the locking version of the queue, but again on low core machine the gain is very small.

Update On Conclusion

So should we bother with lock free solution ? If we do have a very loaded system that runs on multiple cores then yes! Besides this implementation is a work stealing queue that has a primary role to boost performance on loaded system by letting the low loaded threads steal work from other queues, however the primary question here would be should we go for the implementation of lock based work stealing queue where or should we go lock free all the way? Well in lock based work stealing we would participate in a enqueue lock upon stealing and in the lock free version we would still participate in the same pseudo lock, but when the enqueue ratio is small at the moment there is a big chance that we will not spin and in result the whole steal operation will be faster then acquiring the lock and wait on on it (especially when the number of cores goes up). Another thing is that very short locks in .NET are implemented internally as spin locks thus they do not switch to the Operating System so the performance of such short locks are very good (I will try to shed more info on that topic in the future posts). So the answer to the question is go lock free if the production system uses a high core machine and go lock based if you cannot spare cores.


2 comments:

Sławomir Bryś said...

Interesting article, however I'm just curious, do you know how many threads were created during your tests on thread pool?
According to MSDN it may vary: http://msdn.microsoft.com/en-us/library/system.threading.threadpool.aspx
Maybe you could check it and put into article?

Bartosz Adamczewski said...

@Sławomir Bryś:

Your right, and I'm actually pretty stupid as depending on the platform a work stealing thread pool could have been used and the test would be totally screwed (it might be).

I will do the tests using standard threads and will and that to this article.

Thanks for pointing that out.