Pages

Monday, May 7, 2012

Work Stealing

Work Stealing

When working with multi threaded applications, we tend to spawn worker threads which can lead to ineffective code when not done correctly, for example threads will not get reused and will be recreated for each work items. The solution to all those problems is a ThreadPool which reuses threads and queues work items that are consumed by those threads. This sort of implementation while simple can have side effects as unless the pool Queue is immutable it will have to be locked each time an item Enqueued and Dequeued so if the pool operates on a fair amount of threads and lightweight tasks a large chunk of the time will be spent on waiting on a lock.

One solution to that problems is to reverse the logic of queuing and split the queues and assign them to worker threads, so that each thread will process it's own unique queue, and the pool will actually hold a thread queue and assign work to the threads. This improves performance of the pool as there is almost no need to lock the queues, but also has one downside, as it may happen that one thread that finished it's queue is doing nothing while the other is working hard. This is called thread starving, well in a technical since it's not as to starve we would want to do work but we would be denied the time, but as we are in managed user code we are kinda starving our threads as we are simply wasting time doing nothing. To overcome that problem we need to incorporate one simple rule, a thread that has nothing to process can steal work from other threads, this technique is called Work Stealing.

.NET 4.0 Incorporates this technique in it's thread pool (you will have the new pool in 2.0 and 3.5 if you just install the runtime) and the performance boost over the old thread pool can be 2x times! faster as some sources show, so the technique certainly pays off. With the addition of Tasks in 4.0 we do have a very powerful programming model for multithreaded applications, but what if we would want even more control over the pool and work stealing? or we simply can't use tasks? or both?

Smart Thread

First we need to create a thread class that implements work queues, and can post statistics and commands to the custom pool:


    /// <summary>
    /// A smart thread that incorporates work stealing tehniques.
    /// </summary>
    public class SmartThread : IComparable<SmartThread>
    {
        private const int waitSpinLmit = 5;
        private const int waitSpinTime = 30;

        private readonly Thread thread;
        internal readonly Queue<Action> scheduler;
        private readonly ManualResetEvent wait;
        internal readonly object locker = new object();

        private bool isSignalled;
        private bool isPendingJoin;
        private int id;
        private int executionCount;
        private long totalTime;
        private bool isInitializedInPool;
        private int waitSpinCount = 0;

        /// <summary>
        /// Gets the ManagedThread Id.
        /// </summary>
        public int ThreadId
        {
            get { return id; }
        }

        /// <summary>
        /// Gets the Average task running time.
        /// </summary>
        public decimal AvgTaskTime
        {
            get;
            private set;
        }

        /// <summary>
        /// Gets the TotalTaskTime.
        /// </summary>
        /// <remarks>
        /// This value will get reseted over time, after it reaches the maximum int32 value.
        /// </remarks>
        public long TotalTasksTime
        {
            get { return totalTime; }
        }

        /// <summary>
        /// Gets the value that indicated that the thread was created in the ThreadPool.
        /// </summary>
        public bool IsInitializedInPool
        {
            get { return isInitializedInPool; }
        }

        /// <summary>
        /// An internal constructor that initializes a smart thread with a parameter
        /// indicating that this insance is started in a threadpool.
        /// </summary>
        /// <param name="isInitializedInPool">Indicates that this instance will live in a threadpool.</param>
        internal SmartThread( bool isInitializedInPool )
        {
            this.isInitializedInPool = isInitializedInPool;
            thread = new Thread( new ThreadStart( Process ) );
            wait = new ManualResetEvent( isSignalled );
            scheduler = new Queue<Action>();
            id = thread.ManagedThreadId;
        }

        /// <summary>
        /// Initializes the SmartThread, in the non thread pool scope, therefor
        /// this thread will not steal work.
        /// </summary>
        public SmartThread() : this( false ) { }

        /// <summary>
        /// The thread Processing loop, that consumes up the queue.
        /// </summary>
        private void Process()
        {
            Action localAction = null;

            while ( true )
            {
                //check if our thread is in the signalled state,
                //if that's true then reset it and continue work.
                if ( isSignalled )
                {
                    isSignalled = wait.Reset();
                }

                //TODO: use reader writer locks, or consider hardcore low level locks :D
                Monitor.Enter( locker );
                {
                    if ( scheduler.Count != 0 )
                    {
                        localAction = scheduler.Dequeue();
                        Monitor.Exit( locker );
                    }
                    else
                    {
                        //since there is little to do, then just let threads go.
                        Monitor.Exit( locker );

                        //lets try to steal some of this work.
                        bool workStolen = TryStealWork();

                        //if we stolen something then don't sleap and first check you work queue
                        //and then try to steal again (in case you haven't noticed this is a very subtle
                        //form of spin waiting ).
                        if ( workStolen )
                            continue;

                        if ( isPendingJoin )
                            break;

                        if ( waitSpinCount++ < waitSpinLmit )
                        {
                            //wait and spin, we wake up the thread as this is way more effective when executing
                            //multiple actions.
                            isSignalled = wait.WaitOne( waitSpinTime );
                        }
                        else
                        {
                            waitSpinCount = 0;
                            isSignalled = wait.WaitOne();
                        }
                    }
                }

                //Process the action outside the lock!
                if ( localAction != null )
                {
                    InvokeAction( localAction );
                    localAction = null;

                    //update the stats.
                    //TODO: Make it smarter, and more secure right now there is a very small deadlock chance!!!
                    SmartThreadPool.Reschedule( isInitializedInPool );
                }
            }

            //end processing.
            IsStarted = false;
            wait.Close();
        }

        /// <summary>
        /// Invokes the current action.
        /// </summary>
        /// <param name="localAction">localAction taken from queue.</param>
        private void InvokeAction( Action localAction )
        {
            // start to measure time.
            int ticksStart = System.Environment.TickCount;

            // do execute the action.
            localAction();

            //we do need to reset the stats so that they will not overflow.
            if ( totalTime > int.MaxValue )
            {
                executionCount = 0;
                totalTime = 0;
            }

            //increment the counter.
            executionCount++;
            totalTime += System.Environment.TickCount - ticksStart;
            AvgTaskTime = totalTime / executionCount;
        }

        /// <summary>
        /// Tries to steal workload from other heavy loaded threads.
        /// </summary>
        /// <returns>a boolan flag indicating the steal success or failure.</returns>
        internal bool TryStealWork()
        {
            //1. Ask the pool for a thread with the worst stats.
            //2. Access it's internal queue by calling count and then doing Dequeue
            // Here we eith hold a lock or we dont lock at all and handle all queue empty exception.

            SmartThread threadToSteal = SmartThreadPool.GetThreadToSteal( isInitializedInPool );

            //This code is needed as ThreadPool might tell us that in some sittuations that we can steal
            //work from ourselvs for e.g if other thread will join or we will fork ourselfs and the operations
            //is running.
            if ( threadToSteal.id != this.id )
            {
                Action localAction = null;

                //lock on the thread we are stealing from.
                lock ( threadToSteal.locker )
                {
                    //perform a steal.
                    if ( threadToSteal.scheduler.Count != 0 )
                    {
                        localAction = threadToSteal.scheduler.Dequeue();
                    }
                }

                if ( localAction != null )
                {
                    localAction();
                    return true;
                }
                else
                    return false;
            }

            return false;
        }

        /// <summary>
        /// Schedules the current action for execution.
        /// </summary>
        /// <param name="action">the action to be executed.</param>
        public void Execute( Action action )
        {
            scheduler.Enqueue( action );
            isSignalled = wait.Set();
        }

        /// <summary>
        /// Starts the current thread.
        /// </summary>
        public void Start()
        {
            if ( isInitializedInPool == false )
                SmartThreadPool.InsertToArtificialScheduler( this );

            thread.Start();
            IsStarted = true;
        }

        /// <summary>
        /// Joins (blocks and releases resources) the current thread.
        /// </summary>
        public void Join()
        {
            isPendingJoin = true;
        }

        /// <summary>
        /// Gets the value that indicates if the thread is started.
        /// </summary>
        public bool IsStarted
        {
            get;
            private set;
        }

        // Actually threads should not be comparable :(
        // TODO: Create a comparable inner class. 
        public int CompareTo( SmartThread other )
        {
            return this.scheduler.Count - other.scheduler.Count;
        }
    }


The two most interesting methods here are Execute and Process. Each time a thread get's a new work item (Action) to process it queues up the item and signals it's internal worker thread that processes the queue.

After each work item excution thread statistics get collected, the thread also updates the pool internal scheduler saying that the pool queue needs to be reordered to be valid. The pool uses a heap data structure to maintain threads with the lowest item count as the more preferred ones. Thread will start to steal work after it's own queue is empty, if the steal was succeeded it will spin and continue with the processing loop. If the steal was not successful the thread will wait on the even and wake itself up after 30ms and will look for work, after set amount of tries it will wait for the event to be signaled forever. In heavy load scenarios spin waiting on a signal will improve performance as waking up the thread is less expensive. When a thread is created and started by the user code it still will end up in thread pool and will participate in scheduling as well as work stealing, but it will use different queue for user code threads.

Smart Thread Pool

Now let's look at the ThreadPool code:

/// <summary>
/// Thread pool that incorprates fair thread scheduling as well as work stealing.
/// </summary>
public static class SmartThreadPool
{
    /// <summary>
    /// Gets/Sets the threadpool debug mode.
    /// </summary>
    /// <remarks>
    /// When enabled upon scheduling a new work item, the thread pool
    /// will log certain actions.
    /// </remarks>
    public static bool DebugModeOn
    {
        get;
        set;
    }

    /// <summary>
    /// Best case count thread count (best case means the lowest count).
    /// </summary>
    private static int bestThCnt;

    /// <summary>
    /// Locker object for any threads that want to put stuff in.
    /// </summary>
    private static readonly object locker = new object();

    /// <summary>
    /// Locker for artificial scheduler.
    /// </summary>
    private static readonly object artificialLocker = new object();

    /// <summary>
    /// Indicates that the artificial lock was taken by a thread
    /// </summary>
    private static bool artificialLockerTaken;

    /// <summary>
    /// Heap that represents a priority queue of worker threads.
    /// </summary>
    private static readonly Heap<SmartThread> threadScheduler = null;

    /// <summary>
    /// Heap that represents a priority queue of worker threads that werent created in the pool.
    /// </summary>
    private static readonly Heap<SmartThread> artificialThreadScheduler = null;

    /// <summary>
    /// A default static constructor that initializes the pool threads.
    /// </summary>
    static SmartThreadPool()
    {
        //create an empty heap.
        artificialThreadScheduler = new Heap<SmartThread>( new SmartThread[] { } );

        //the idea here is {core_count} * 2 the rest should be spawned as fibers.
        //(fiber fuctionality comming soon :D, keep your hopes up kids!)
        SmartThread[] threads = new SmartThread[ Environment.ProcessorCount * 2 ];

        for ( int i = 0; i < threads.Length; i++ )
            threads[ i ] = new SmartThread( true );

        threadScheduler = new Heap<SmartThread>( threads );
    }

    /// <summary>
    /// Queues a new Action on the thread pool.
    /// </summary>
    /// <param name="action">Action delegate.</param>
    public static void QueueWorkItem( Action action )
    {
        SmartThread lowestWorkloadThread = null;

        lock ( locker )
        {
            /*
            * Accesing the root and it's direct children gives us better performance then just
            * accesing the root. This is due that we don't lock the items on heap operations
            * so while we reorganize the queue we might end up in a worse thread, but statistically
            * when this happens better load statistics are in roots children.
            */

            lowestWorkloadThread = threadScheduler.items[ bestThCnt++ % 3 ];
        }

        //If a thread is not started then do Start it.
        if ( lowestWorkloadThread.IsStarted == false )
        {
            lowestWorkloadThread.Start();
        }

        //schedule a task.
        lowestWorkloadThread.Execute( action );

        // for debug only.
        if ( DebugModeOn == true )
        {
            foreach ( SmartThread thread in threadScheduler.items )
            {
                Console.Write( string.Format( "t{0:000} a:{1:000} c:{2:000}; ",
                    thread.ThreadId, thread.AvgTaskTime, thread.scheduler.Count ) );
            }

            Console.WriteLine();
        }
    }

    /// <summary>
    /// Tries to steal work from the most loaded thread in the pool.
    /// </summary>
    /// <returns>SmartThread</returns>
    /// <param name="threadPoolThread">boolean value idnicating that we want threads that
    /// were created in a threadpool.</param>
    /// <returns>SmartThread that has the most work to do.</returns>
    internal static SmartThread GetThreadToSteal( bool threadPoolThread )
    {
        //get the element that has most of the load.
        if ( threadPoolThread )
            return threadScheduler.ReadMax();

        return artificialThreadScheduler.ReadMax();
    }

    /// <summary>
    /// Atempts to rebuild the pririty queue, to contain correct information
    /// about priorities.
    /// </summary>
    /// <param name="threadPoolThread">boolean value idnicating that we want threads that
    /// were created in a threadpool.</param>
    internal static void Reschedule( bool threadPoolThread )
    {
        if ( threadPoolThread )
        {
            //we don't need to lock this section as generally we are ok
            //with the race as it will not cause any errors but putting a simple
            //flag arround it should be enough to prevent most races.
            if ( threadScheduler.IsBuilding == false )
                threadScheduler.BuildHeap();

            return;
        }

        //if the lock was not taken then we proceed to lock it
        //we use this as we don't want to block threads that build heap
        //just the threads that insert and remove values from the heap.
        if ( artificialLockerTaken == false )
            artificialLockerTaken = Monitor.TryEnter( artificialLocker );
        else
            return;

        if ( artificialThreadScheduler.IsBuilding == false )
            artificialThreadScheduler.BuildHeap();

        //we taken the lock so we need to exit.
        if ( artificialLockerTaken == true )
            Monitor.Exit( artificialLocker );
    }

    /// <summary>
    /// Inserts a thread that not orginated in a thread pool to the artificial scheduler,
    /// in order to enable fair work scheduling and work stealing.
    /// </summary>
    /// <param name="thread">SmartThread.</param>
    internal static void InsertToArtificialScheduler( SmartThread thread )
    {
        lock ( artificialLocker )
        {
            artificialThreadScheduler.Insert( thread );
        }
    }

    /// <summary>
    /// Removes the given thread that not orginated in a thread pool from the artificial scheduler.
    /// </summary>
    /// <param name="thread">SmartThread</param>
    internal static void RemoveFromArtificialScheduler( SmartThread thread )
    {
        lock ( artificialLocker )
        {
            int cnt = 0;
            int size = artificialThreadScheduler.GetSize();
            SmartThread[] artificialThreadItems = new SmartThread[ --size ];

            //the linear search in this context should not be to big of a problem,
            //as if we join then this means that probably our queue is empty so we should
            //be somwhere in the top of the tree.
            foreach ( SmartThread artificialThread in artificialThreadScheduler.items )
            {
                if ( artificialThread.ThreadId != thread.ThreadId )
                {
                    artificialThreadItems[ cnt ] = artificialThread;
                    cnt++;
                }
            }

            artificialThreadScheduler.SetSize( size );
            artificialThreadScheduler.items = artificialThreadItems;
        }
    }
}

All that the threadoool is doing is simply using the Heap structure to give priorities to workitems, the heap could be interfaced to provide many different schedules but I leave that to the reader as this would cloud the implementation for the purpose of this article. The scheduler uses a rule that threads will less to do have a higher priority thus they will get new work items to process. This is potentially not the best model for work stealing because thread starvation is not very likely in this model thus work is more or less evenly split when queued but when processing the queues will become uneven and work stealing will happen. The pool uses two separate queues the first one's size is based of the core count * 2 and it's purely for queuing items in the pool, the second one is expendable and it's for threads created outside the pool. As mentioned all smart threads need to participate in scheduling and work stealing to be more effective thus smart thread adds itself to the pool if it's spanned by user code, this will yeild potential performance benefit's for the user code.

The heap code is a very standard implementation of a min heap with no special tricky code inside so the implementation for the purpose of this article will be skipped, but it will get posted as a full code solution.

Performance

Having all that before we build the Task abstractions let's test out the raw performance of this solution and the .NET 4.0 thread pool which uses almost identical concepts. The tests were run separately as running both pools in a single process could affect one another pool.

public static ManualResetEvent ev = new ManualResetEvent( false );
public static int MAX = 200;
public static int CountFast1 = 2000;
public static int CountFast2 = 10000;
static int cnt = 0;

static void Main( string[] args )
{
    long t1, t2, t3;

    Console.WriteLine( "1)\n" );

    t1 = DoTests();
    //t1 = DoTestsSmart();

    Console.WriteLine( "2)\n" );
    Thread.Sleep( 100 );

    t2 = DoTests();
    //t2 = DoTestsSmart();

    Console.WriteLine( "3)\n" );
    Thread.Sleep( 100 );

    t3 = DoTests();
    //t3 = DoTestsSmart();

    Console.WriteLine( "AVG: " + ( t1 + t2 + t3 ) / 3 );
    Console.ReadKey();
}

static long DoTestsSmart()
{
    ev = new ManualResetEvent( false );
    cnt = 0;

    Console.WriteLine( "SmartThreadPool GO:" );

    Stopwatch w = new Stopwatch();
    w.Start();

    for ( int i = 0; i < MAX; i++ )
    {
        if ( i % 5 == 0 )
            SmartThreadPool.QueueWorkItem( new Action( ComputeLongRunning ) );
        else
            SmartThreadPool.QueueWorkItem( new Action( ComputeShortRunning ) );
    }
    ev.WaitOne();

    w.Stop();
    Console.WriteLine("SmartThreadPool: " + w.ElapsedMilliseconds );
    ev.Reset();

    return w.ElapsedMilliseconds;
}

static long DoTests()
{
    ev = new ManualResetEvent( false );
    cnt = 0;

    Console.WriteLine( "ThreadPool GO:" );

    Stopwatch w = new Stopwatch();
    w.Start();

    for ( int i = 0; i < MAX; i++ )
    {
        if ( i % 5 == 0 )
            ThreadPool.QueueUserWorkItem( new WaitCallback( ComputeLongRunning ) );
        else
            ThreadPool.QueueUserWorkItem( new WaitCallback( ComputeShortRunning ) );
    }
    ev.WaitOne();

    w.Stop();
    Console.WriteLine( "ThreadPool: " + w.ElapsedMilliseconds );
    ev.Reset();

    return w.ElapsedMilliseconds;
}


static void ComputeShortRunning()
{
    ComputeShortRunning( null );
}

static void ComputeLongRunning()
{
    ComputeLongRunning( null );
}

static void ComputeShortRunning( object o )
{
    string s = string.Empty;

    for ( int i = 0; i < CountFast1; i++ )
    {
        s += i.ToString();
    }

    Interlocked.Increment( ref cnt );

    if ( cnt == MAX )
    {
        ev.Set();
    }
}

static void ComputeLongRunning( object o )
{
    string s = string.Empty;

    for ( int i = 0; i < CountFast2; i++ )
    {
        s += i.ToString();
    }

    Interlocked.Increment( ref cnt );

    if ( cnt == MAX )
    {
        ev.Set();
    }
}

The code makes an assumption that most of the work that is to be done is expressed through short running tasks, but from time to time we will get something heavy to process, before each test the code sets up a wait handle (ManualResetEvent) to determine if all of the work items have finished. Each process runs the test method three times and then the average is calculated, this is to prevent any noise upon app start and GC collections and internal pinning and inilining.

The performance of .NET 4 thread pool is:


The performance of SmartThreadPool is:


While not that big of a difference the time change is noticeable and in performance critical applications can be quite beneficial. Why this difference? The answer is not simple as we cannot analyze the threadpool implementation detail using reflector or any other decompiler as it uses unmanaged functions most of the time so the implementation is not easy to get to. One possible reason is that my code makes use of spinning and waiting on a signal which is a big performance gain over simply waiting ( but still after changing that code the performance of SmartThreadPool is still better ). Another reason might be better work stealing logic, but that's a bit of a stretch as my way of doing things can be heavily improved upon like for e.g work could be stolen in a batch. 

Task model

With most of the components in place we can start implementing a SIMPLE (for now) task model, but why would we want a task programming model to do anything in code? Well it provides a very nice abstraction over scheduling and parallel execution, and many more. When thinking in tasks we could just create tasks as units of execution and the underlying code would then decide if the task should be a multithreaded task or a fiber or a queued up task on a single thread, and this sort of logic could be applied to other aspects of a task, like logging, mutability etc.

In order to introduce tasks we need to exchange the Action in SmartThread and ThreadPool method arguments, and instead of an action we should call InternalRun from the new Task class which is as follows:

    /// <summary>
    /// Represents a Task.
    /// </summary>
    public class Task
    {
        private ManualResetEvent wait = new ManualResetEvent( false );
        private Action action;
        internal TaskStatus status;
        private TaskException ex;

        /// <summary>
        /// Initializes a task by passign the Action to be executed by it.
        /// </summary>
        /// <param name="action">action to be executed.</param>
        public Task( Action action )
        {
            this.action = action;
            status = TaskStatus.NotStarted;
        }

        /// <summary>
        /// Blocks the current thread until the tasks work is done.
        /// </summary>
        public void Wait()
        {
            wait.WaitOne();

            //this is bad as there are cases where we would not wait for the compleation
            //althought it's a good practice to!
            //TODO find a better way to handle that.
            if ( status == TaskStatus.Error )
                throw ex;
        }

        /// <summary>
        /// Runs the task synchronously.
        /// </summary>
        public void RunSynchronously()
        {
            RunSynchronously( new Task[] { } );
        }

        internal TaskStatus InternalRun()
        {
            try
            {
                status = TaskStatus.Running;

                action();
                wait.Set();

                status = TaskStatus.Done;
            }
            catch ( Exception ex )
            {
                //we need to capture the exception from other threads as we don't want to crash the
                //pools threads.
                status = TaskStatus.Error;
                this.ex = new TaskException( "InternalRun", ex );

            }

            return status;
        }

        /// <summary>
        /// Runs the task asynchronously.
        /// </summary>
        public void RunAsynchronously()
        {
            RunAsynchronously( new Task[] { } );
        }

        /// <summary>
        /// Runs the task synchronously.
        /// </summary>
        /// <param name="tasksToWaitFor">the list of tasks to wait for, before running.</param>
        public void RunSynchronously( params Task[] tasksToWaitFor )
        {
            WaitForCompletion( tasksToWaitFor );
            status = TaskStatus.Running;
            if ( InternalRun() == TaskStatus.Error )
                throw ex;
        }

        /// <summary>
        /// Runs the task asynchronously.
        /// </summary>
        /// <param name="tasksToWaitFor">the list of tasks to wait for, before running.</param>
        public void RunAsynchronously( params Task[] tasksToWaitFor )
        {
            WaitForCompletion( tasksToWaitFor );
            status = TaskStatus.Running;
            SmartThreadPool.QueueWorkItem( this );
        }

        /// <summary>
        /// Creates a task.
        /// </summary>
        /// <param name="action">The action to be eecuted by a task.</param>
        /// <returns>Task.</returns>
        public static Task Create( Action action )
        {
            return new Task( action );
        }

        private void WaitForCompletion( Task[] tasks )
        {
            foreach ( Task task in tasks )
                task.Wait();
        }
    }


Summary

Work stealing as a technique can be used in many places like collections, business logic (banking for eg) to greatly boost performance of concurrent code. This article just scratched the surface what could be potentially done with SmarthThreads, WorkSteling and Task based programming models, certainly there's much more to be explored and improved upon, and that I will try to do in articles that will come.


The full source code of this article can be found here.

No comments: