2011-03-05

Semaphore with FIFO order and priorities.

Semaphore with FIFO order and priorities. Also support cancelation token.

public class OrderedList<K, V>
{
    private SortedList<K, List<V>> m_list = new SortedList<K, List<V>>();

    public void Add(K a_key, V a_value)
    {
        if (Values.Contains(a_value))
            throw new Exception();

        if (!Keys.Contains(a_key))
            m_list.Add(a_key, new List<V>());

        m_list[a_key].Add(a_value);
    }

    public IEnumerable<V> Values
    {
        get
        {
            foreach (var sublist in m_list.Values)
                foreach (var v in sublist)
                    yield return v;
        }
    }

    public IEnumerable<K> Keys
    {
        get
        {
            return m_list.Keys;
        }
    }

    public void RemoveByValue(V a_value)
    {
        foreach (var sublist in m_list.Values)
        {
            if (sublist.Remove(a_value))
                break;
        }
    }

    public int Count 
    {
        get
        {
            return Values.Count();
        }
    }

    public V RemoveFirst()
    {
        V v = Values.First();
        RemoveByValue(v);
        return v;
    }
}

public class QueuedSemaphore<P>
{
    private Object m_lock = new Object();
    private OrderedList<P, ManualResetEvent> m_queue =
        new OrderedList<P, ManualResetEvent>();
    private int m_working = 0;
    private readonly int m_count;

    public QueuedSemaphore(int a_count)
    {
        m_count = a_count;
    }

    public void WaitOne(CancellationToken a_token, P a_priority)
    {
        ManualResetEvent mre = null;

        lock (m_lock)
        {
            if (m_working == m_count)
            {
                mre = new ManualResetEvent(false);
                m_queue.Add(a_priority, mre);
            }
            else
                m_working++;
        }

        if (mre != null)
        {
            while (!mre.WaitOne(100))
            {
                if (a_token.IsCancellationRequested)
                {
                    lock (m_lock)
                    {
                        if (mre.WaitOne(0))
                            Release();
                        else if (m_queue.Values.Contains(mre))
                            m_queue.RemoveByValue(mre);
                    }
                    a_token.ThrowIfCancellationRequested();
                }
            }
        }
    }

    public void WaitOne(P a_priority)
    {
        ManualResetEvent mre = null;

        lock (m_lock)
        {
            if (m_working == m_count)
            {
                mre = new ManualResetEvent(false);
                m_queue.Add(a_priority, mre);
            }
            else
                m_working++;
        }

        if (mre != null)
            mre.WaitOne();
    }

    public void Release()
    {
        lock (m_lock)
        {
            if (m_queue.Count != 0)
            {
                m_queue.RemoveFirst().Set();
            }

            if (m_queue.Count < m_working)
                m_working = m_queue.Count;
        }
    }
}

Brak komentarzy:

Prześlij komentarz