Pages

Monday, December 6, 2010

Stream ByteLock

Stream ByteLock

Having created an improved stream class with various much needed methods, to suit my needs for creating DataStores, I needed a specific locking mechanism that can do only section locking on a stream or array, or anything that can be divided into set of elements, so I googled and found nothing of use.

What Is It for?


Usually for problems when many threads share a stream or an array, I'm intending to use it in a scenario where there will be a FileStream with no lock on a file handle, so multiple threads can write to it, and block needed sections of the file as the file structure will be a BinaryTree, so threads will be blocking branches.

Also it's a lot faster than using a waiting queue on the whole file.

Win32 Solution

A partial solution is to use Win API function "LockFileEx" that has the following interface:

BOOL WINAPI LockFileEx(
  __in        HANDLE hFile,
  __in        DWORD dwFlags,
  __reserved  DWORD dwReserved,
  __in        DWORD nNumberOfBytesToLockLow,
  __in        DWORD nNumberOfBytesToLockHigh,
  __inout     LPOVERLAPPED lpOverlapped
);

but this needs unmanaged calls and working with the API code that has many params and it's hard to use, but on the plus side it's very fast and it's more or less a guaranteed to work as planned.

But I want to be OS independent (Database Engines are and i'm creating one and need such functionality), and besides I want 100% managed code. So having such requirements I didn't find any out of the box solution to this problem, so I had to write my own from the ground up.

My Own ByteLock

My own version is just a simple static class that uses the most basic synchronization mechanism, nothing fancy to keep the complexity of the locking code as simple as possible because writing such code in a proper manner is very VERY hard, and deadlocks, race conditions and other surprises can happen, and they are hard to find.

So let's look at the code:

public class Range
    {
        public long StartIndex { get; set; }
        public long EndIndex { get; set; }
     
        public Range(long startIndex, long endIndex)
        {
            this.StartIndex = startIndex;
            this.EndIndex = endIndex;
        }
    }

    public class RangeLock : Range
    {
        public RangeLock() : base(-1, -1)
        {
            this.ThreadId = Thread.CurrentThread.ManagedThreadId;
        }

        public RangeLock(Range range) : base(range.StartIndex, range.EndIndex)
        {
            this.ThreadId = Thread.CurrentThread.ManagedThreadId;
        }

        public int ThreadId {get; set;}
    }


    /// <summary>
    /// Byte Lock class that performs, locking on specified bytes in the stream,
    /// but can be used to perform element locking in arrays.
    /// </summary>
    public static class ByteLock
    {    
        private static List<RangeLock> sections; 
        private static object _lock = new object();

        static ByteLock()
        {
            sections = new List<RangeLock>();
        }

        /// <summary>
        /// Locks the section.
        /// </summary>
        /// <param name="range">The range.</param>
        public static void LockSection(Range range)
        {
            List<RangeLock> results = new List<RangeLock>();

            Monitor.Enter(_lock);
            {

                while (IsInSections(range.StartIndex, range.EndIndex))
                {
                    Monitor.Wait(_lock);
                }

                sections.Add(new RangeLock(range));

            }
            Monitor.Exit(_lock);
        }

        /// <summary>
        /// Writes to some stream/or array.
        /// </summary>
        /// <param name="range">The range.</param>
        /// <param name="action">The action.</param>
        public static void WriteTo(Range range, Action<Range> action)
        {
            RangeLock intersectingRange = null;

            Monitor.Enter(_lock);
            {
                intersectingRange = FindSection(range.StartIndex, range.EndIndex);
            }
            Monitor.Exit(_lock);

            if (intersectingRange != null &&
                intersectingRange.ThreadId != Thread.CurrentThread.ManagedThreadId)
            {
                Monitor.Enter(_lock);
                {
                    Monitor.Wait(_lock);
                }
                Monitor.Exit(_lock);
            }

            if (sections.
                Find(x => x.ThreadId == Thread.CurrentThread.ManagedThreadId) == null)
            {
                Monitor.Enter(_lock);
                {
                    Monitor.Pulse(_lock);
                }
                Monitor.Exit(_lock);
            }

            action(range);
        }

        /// <summary>
        /// Unlocks the section.
        /// </summary>
        /// <param name="range">The range.</param>
        public static void UnLockSection(Range range)
        {
            Monitor.Enter(_lock);
            {
                RemoveSections(range.StartIndex, range.EndIndex);
                Monitor.Pulse(_lock);
            }
            Monitor.Exit(_lock);
        }

        /// <summary>
        /// Determines whether [is in sections] [the specified start].
        /// </summary>
        /// <param name="start">The start.</param>
        /// <param name="end">The end.</param>
        /// <returns>
        ///     <c>true</c> if [is in sections] [the specified start]; otherwise, <c>false</c>.
        /// </returns>
        private static bool IsInSections(long start, long end)
        {
            foreach (var section in sections)
            {
                if (IsRangeInSection(start, end, section))
                    return true;
            }

            return false;
        }

        /// <summary>
        /// Finds the section.
        /// </summary>
        /// <param name="start">The start.</param>
        /// <param name="end">The end.</param>
        /// <returns></returns>
        private static RangeLock FindSection(long start, long end)
        {
            foreach (var section in sections)
            {
                if (IsRangeInSection(start, end, section))
                    return section;
            }

            return null;
        }

        /// <summary>
        /// Removes the sections.
        /// </summary>
        /// <param name="start">The start.</param>
        /// <param name="end">The end.</param>
        private static void RemoveSections(long start, long end)
        {
            List<RangeLock> results = new List<RangeLock>();

            foreach (var section in sections)
            {
                if (IsRangeInSection(start, end, section))
                {
                    if (section.ThreadId == Thread.CurrentThread.ManagedThreadId)
                        results.Add(section);
                }
            }
            foreach (var x in results)
            {
                sections.Remove(x);
            }
        }

        /// <summary>
        /// Determines whether [is range in section] [the specified start].
        /// </summary>
        /// <param name="start">The start.</param>
        /// <param name="end">The end.</param>
        /// <param name="section">The section.</param>
        /// <returns>
        ///     <c>true</c> if [is range in section] [the specified start]; otherwise, <c>false</c>.
        /// </returns>
        private static bool IsRangeInSection(long start, long end, RangeLock section)
        {
            if (start < section.StartIndex && end > section.StartIndex && end <= section.EndIndex)
                return true;
            if (start >= section.StartIndex && start < section.EndIndex && end > section.EndIndex)
                return true;
            if (start <= section.StartIndex && end >= section.EndIndex) return true;
            if (start >= section.StartIndex && end <= section.EndIndex) return true;

            return false;
        }
    }

As you can see from the code, the class is using an internal single lock and it's using the Wait and Pulse pattern, that's implemented in the Monitor class. The ByteLock hold a private list of RangeLocks to know who is holding what, and checks is thread wanting to write or lock a given range doesn't intersect the list of sections if he is then that thread is put into a waiting queue.

The most usual case of use will be for a thread to lock a section or wait for a lock and then execute writing or reading code on the own section. And this will not block the other threads that will use different parts of the stream. Now to do a full FileStream locking code one should also consider a situation when other processes would want to write to that file and multiple threads, so mutex locking would be desired to perform a full locking byteLock.

Sample Usage


private static DataStream dataStream = new DataStream(new MemoryStream());

        public void MethodOne(object nothing)
        {
            Console.WriteLine("Thread: " + Thread.CurrentThread.ManagedThreadId);
            Range range = new Range(0,50);

            Thread.Sleep(100);

            ByteLock.LockSection(range);
            ByteLock.WriteTo(range, (x) =>
            {
                dataStream.Position = x.StartIndex;
                dataStream.WriteBytes((int)(x.EndIndex - x.StartIndex), 100);
            });
            ByteLock.UnLockSection(range);
        }

        public void MethodTwo(object nothing)
        {
            Console.WriteLine("Thread: " + Thread.CurrentThread.ManagedThreadId);

            Thread.Sleep(50);

            Range range = new Range(50,100);
            ByteLock.WriteTo(range, (x) =>
            {
                dataStream.Position = x.StartIndex;
                dataStream.WriteBytes((int)(x.EndIndex - x.StartIndex), 200);
            });
        }

        public void MethodThree(object nothing)
        {
            Console.WriteLine("Thread: " + Thread.CurrentThread.ManagedThreadId);
            Range range = new Range(0, 50);

            ByteLock.LockSection(range);
            Thread.Sleep(100);
            ByteLock.WriteTo(range, (x) =>
            {
                dataStream.Position = x.StartIndex;
                dataStream.WriteBytes((int)(x.EndIndex - x.StartIndex), 250);
            });
            ByteLock.UnLockSection(range);
        }

        static void Main(string[] args)
        {
            Program p = new Program();

            Thread t1 = new Thread(new ParameterizedThreadStart(p.MethodOne));
            Thread t2 = new Thread(new ParameterizedThreadStart(p.MethodTwo));
            Thread t3 = new Thread(new ParameterizedThreadStart(p.MethodThree));

            t1.Start();
            t2.Start();
            t3.Start();
        }

There is nothing to explain here ;-), some basic stupid code just to show how to use.

Summing Up


This is a simple implementation of a class that is very needed but missing in Net so I thought that I should fill the gap. I will be releasing this along with the DataStream class and some other support classes as a codeplex project, but I need more classes to form a 'real' project. So for now full source code can be found in this post for ByteLock, but only partial code samples are present for DataStream as the class is super big in therms of lines of code, so either I will put it on codeplex in time or find another place to post it..

No comments:

 
ranktrackr.net