这是无锁的.NET队列线程安全的?这是、队列、线程、安全

由网友(故事再撩人也只是故事)分享简介:我的问题是,下面是包括了一个阅读器单写队列级的线程安全类的?这种队列称为无锁,即使如果队列装满它会阻止。该数据结构的灵感来自于Marc Gravell的实现阻塞队列的在这里计算器。的结构的点是允许一个线程将数据写入缓冲器中,而另一个线程来读取数据。所有这一切都需要尽可能快地发生。 有一个类似的数据结构,在DDJ的文章香...

我的问题是,下面是包括了一个阅读器单写队列级的线程安全类的?这种队列称为无锁,即使如果队列装满它会阻止。该数据结构的灵感来自于Marc Gravell的实现阻塞队列的在这里计算器。

的结构的点是允许一个线程将数据写入缓冲器中,而另一个线程来读取数据。所有这一切都需要尽可能快地发生。

有一个类似的数据结构,在DDJ的文章香草萨特,除了实现在C ++中。另一个区别是,我使用的是香草链表,​​我使用数组的链表。

而不是仅仅包括code片段我有整个事情与一个宽容的开源许可评论(MIT许可1.0)的情况下,任何人发现它是有用的,并希望使用它(原样或修改)。

这是有关询问关于如何创建一个并发阻塞队列(见的在创建.NET 和Thread-safe在.NET 阻塞队列实现)。

线程安全和锁

下面是code:

 使用系统;
使用System.Collections.Generic;
使用的System.Threading;
使用System.Diagnostics程序;

命名空间CollectionSandbox
{
    ///这是实现一个单一的读/辛格勒作家缓冲队列
    ///与(几乎)没有锁。如果填充该实现只阻止
    /// 向上。实施是阵列的一个链表。
    ///它是由欲望启发,创造一个非阻塞版本
    ///由Marc Gravell在C#中的阻塞队列落实
    /// http://stackoverflow.com/questions/530211/creating-a-blocking-queuet-in-net/530228#530228
    类SimpleSharedQueue< T> :IStreamBuffer< T>
    {
        ///用于信号的东西都不再满
        ManualResetEvent的canWrite =新的ManualResetEvent(真正的);

        ///这是一个缓冲区的大小
        const int的BUFFER_SIZE = 512;

        ///这是节点的最大数目。
        const int的MAX_NODE_COUNT = 100;

        ///这标志着将新数据写入到的位置。
        光标加法器;

        ///这标志着从读取新数据的位置。
        光标卸妆;

        ///指示没有更多的数据将被写入该节点。
        公共布尔完成= FALSE;

        ///节点是数据项的数组,指针下一个项目,
        ///和在被占领的项目数的索引
        类节点
        {
            ///当数据被存储。
            大众T []数据=新的T [BUFFER_SIZE];

            ///当前存储在节点的数据项的数量。
            下一个公开节点;

            ///当前存储在节点的数据项的数量。
            公众诠释计数;

            ///默认的构造方法,仅适用于第一个节点。
            公共节点()
            {
                计数= 0;
            }

            ///只会被调用由作家将新节点添加到场景
            公共节点(T X,节点preV)
            {
                数据[0] = X;
                数= 1;

                //将previous节点安全地更新以指向这个节点。
                //读者可以看的地步,而我们将其设置,所以这应该是
                // 原子。
                Interlocked.Exchange(参考prev.next,这一点);
            }
        }

        ///这是用来指向单个节点内的位置,并且可以执行
        ///读或作家。一个光标将只读取,而另一个光标只会
        ///曾经写​​。
        类光标
        {
            ///指向父队列
            公共SimpleSharedQueue< T> q;

            ///当前节点
            公共节点节点;

            ///对于作家,这点到该下一个项目将被写入的位置。
            ///对于一个阅读器,这个指向该下一个项目将被读取的位置。
            公众诠释电流= 0;

            ///创建一个新的光标,指向节点
            公共光标(SimpleSharedQueue< T> Q,节点的节点)
            {
                this.q = Q;
                this.node =节点;
            }

            ///用于推动更多的数据到队列
            公共无效写入(T X)
            {
                Trace.Assert(当前== node.count);

                //检查我们是否在节点的限制,并且将需要分配一个新的缓冲区。
                如果(当前== BUFFER_SIZE)
                {
                    //检查队列已满
                    如果(q.IsFull())
                    {
                        //信号canWrite事件为假
                        q.canWrite.Reset();

                        //等待,直到canWrite事件信号
                        q.canWrite.WaitOne();
                    }

                    //创建一个新的节点
                    节点=新节点(X,节点);
                    电流= 1;
                }
                其他
                {
                    //如果实现为正确,则读者将从未尝试访问此
                    //当我们将它设置数组位置。这是因为不变量的那
                    //如果读取器和写入器处于相同节点:
                    // reader.current< node.count
                    // 和
                    // writer.current = node.count
                    node.data [电流++] = X;

                    //我们必须使用互锁,以确保我们incremeent计数
                    // atomicalluy,因为读者可以阅读它。
                    Interlocked.Increment(REF node.count);
                }
            }

            ///从队列中提取数据,只有当返回false
            /// 那里
            公共BOOL读(REF牛逼X)
            {
                而(真)
                {
                    如果(电流I node.count)
                    {
                        X = node.data [电流++];
                        返回true;
                    }
                    否则,如果((当前== BUFFER_SIZE)及和放大器;!(node.next = NULL))
                    {
                        //将当前节点到下一个。
                        //我们知道它是安全的这样做。
                        //旧节点将有它没有更多引用它
                        //并会被垃圾收集器删除。
                        节点= node.next;

                        //如果有写线程等待队列,
                        //然后松开。
                        //概念上有一个如果(q.IsFull),但我们不能把它
                        //因为这会导致竞争条件。
                        q.canWrite.Set();

                        //指向第一个点
                        电流= 0;

                        //其中一个不变的是,每一个节点之后创建的第一个,
                        //将具有至少一个项目。所以下面的调用是安全的
                        X = node.data [电流++];
                        返回true;
                    }

                    //如果我们到这里,我们已经看过最新添加的数据。
                    //然后我们检查,看看是否作家已经完成生产数据。
                    如果(q.completed)
                        返回false;

                    //如果我们在这里没有数据的等待,并没有萎靡不振完成的线程。
                    //等待一毫秒。该系统还将上下文切换。
                    //这将允许写入线程一些额外的资源,以泵出
                    //更多的数据(特别是如果它iself是多线程)
                    Thread.sleep代码(1);
                }
            }
        }

        ///返回当前使用的节点的数目。
        私人诠释NodeCount
        {
            得到
            {
                INT结果为0;
                节点CUR = NULL;
                Interlocked.Exchange<节点>(REF CUR,remover.node);

                //对从去除所有节点的加法器
                //效率不高,但是这不是经常调用。
                而(CUR!= NULL)
                {
                    ++结果;
                    Interlocked.Exchange<节点>(REF CUR,cur.next);
                }
                返回结果;
            }
        }

        ///构建队列。
        公共SimpleSharedQueue()
        {
            节点根=新节点();
            加法器=新的光标(本,根);
            卸妆=新的光标(本,根);
        }

        ///指示给读者,没有更多的数据是要被写入。
        公共无效MarkCompleted()
        {
            完成=真;
        }

        ///读取下一个数据。返回false,如果没有更多的数据。
        公共BOOL读(REF牛逼X)
        {
            返回remover.Read(REF X);
        }

        ///写入更多的数据。
        公共无效写入(T X)
        {
            adder.Write(X);
        }

        ///告诉我们,如果有太多的节点,并且可以不加了。
        私人布尔IsFull()
        {
            返回NodeCount == MAX_NODE_COUNT;
        }
    }
}
 

解决方案

微软研究院CHESS应被证明是用于测试你实现一个很好的工具。

My question is, is the class included below for a single-reader single-writer queue class thread-safe? This kind of queue is called lock-free, even if it will block if the queue is filled. The data structure was inspired by Marc Gravell's implementation of a blocking queue here at StackOverflow.

The point of the structure is to allow a single thread to write data to the buffer, and another thread to read data. All of this needs to happen as quickly as possible.

A similar data structure is described in an article at DDJ by Herb Sutter, except the implementation is in C++. Another difference is that I use a vanilla linked list, I use a linked list of arrays.

Rather than just including a snippet of code I include the whole thing with comment with a permissive open source license (MIT License 1.0) in case anyone finds it useful, and wants to use it (as-is or modified).

This is related to other questions asked on Stack Overflow of how to create a blocking concurrent queues (see Creating a blockinq Queue in .NET and Thread-safe blocking queue implementation in .NET).

Here is the code:

using System;
using System.Collections.Generic;
using System.Threading;
using System.Diagnostics;

namespace CollectionSandbox
{
    /// This is a single reader / singler writer buffered queue implemented
    /// with (almost) no locks. This implementation will block only if filled 
    /// up. The implementation is a linked-list of arrays.
    /// It was inspired by the desire to create a non-blocking version 
    /// of the blocking queue implementation in C# by Marc Gravell
    /// http://stackoverflow.com/questions/530211/creating-a-blocking-queuet-in-net/530228#530228
    class SimpleSharedQueue<T> : IStreamBuffer<T>
    {
        /// Used to signal things are no longer full
        ManualResetEvent canWrite = new ManualResetEvent(true);

        /// This is the size of a buffer 
        const int BUFFER_SIZE = 512;

        /// This is the maximum number of nodes. 
        const int MAX_NODE_COUNT = 100;

        /// This marks the location to write new data to.
        Cursor adder;

        /// This marks the location to read new data from.
        Cursor remover;

        /// Indicates that no more data is going to be written to the node.
        public bool completed = false;

        /// A node is an array of data items, a pointer to the next item,
        /// and in index of the number of occupied items 
        class Node
        {
            /// Where the data is stored.
            public T[] data = new T[BUFFER_SIZE];

            /// The number of data items currently stored in the node.
            public Node next;

            /// The number of data items currently stored in the node.
            public int count;

            /// Default constructor, only used for first node.
            public Node()
            {
                count = 0;
            }

            /// Only ever called by the writer to add new Nodes to the scene
            public Node(T x, Node prev)
            {
                data[0] = x;
                count = 1;

                // The previous node has to be safely updated to point to this node.
                // A reader could looking at the point, while we set it, so this should be 
                // atomic.
                Interlocked.Exchange(ref prev.next, this);
            }
        }

        /// This is used to point to a location within a single node, and can perform 
        /// reads or writers. One cursor will only ever read, and another cursor will only
        /// ever write.
        class Cursor
        {
            /// Points to the parent Queue
            public SimpleSharedQueue<T> q;

            /// The current node
            public Node node;

            /// For a writer, this points to the position that the next item will be written to.
            /// For a reader, this points to the position that the next item will be read from.
            public int current = 0;

            /// Creates a new cursor, pointing to the node
            public Cursor(SimpleSharedQueue<T> q, Node node)
            {
                this.q = q;
                this.node = node;
            }

            /// Used to push more data onto the queue
            public void Write(T x)
            {
                Trace.Assert(current == node.count);

                // Check whether we are at the node limit, and are going to need to allocate a new buffer.
                if (current == BUFFER_SIZE)
                {
                    // Check if the queue is full
                    if (q.IsFull())
                    {
                        // Signal the canWrite event to false
                        q.canWrite.Reset();

                        // Wait until the canWrite event is signaled 
                        q.canWrite.WaitOne();
                    }

                    // create a new node
                    node = new Node(x, node);
                    current = 1;
                }
                else
                {
                    // If the implementation is correct then the reader will never try to access this 
                    // array location while we set it. This is because of the invariant that 
                    // if reader and writer are at the same node: 
                    //    reader.current < node.count 
                    // and 
                    //    writer.current = node.count 
                    node.data[current++] = x;

                    // We have to use interlocked, to assure that we incremeent the count 
                    // atomicalluy, because the reader could be reading it.
                    Interlocked.Increment(ref node.count);
                }
            }

            /// Pulls data from the queue, returns false only if 
            /// there 
            public bool Read(ref T x)
            {
                while (true)
                {
                    if (current < node.count)
                    {
                        x = node.data[current++];
                        return true;
                    }
                    else if ((current == BUFFER_SIZE) && (node.next != null))
                    {
                        // Move the current node to the next one.
                        // We know it is safe to do so.
                        // The old node will have no more references to it it 
                        // and will be deleted by the garbage collector.
                        node = node.next;

                        // If there is a writer thread waiting on the Queue,
                        // then release it.
                        // Conceptually there is a "if (q.IsFull)", but we can't place it 
                        // because that would lead to a Race condition.
                        q.canWrite.Set();

                        // point to the first spot                
                        current = 0;

                        // One of the invariants is that every node created after the first,
                        // will have at least one item. So the following call is safe
                        x = node.data[current++];
                        return true;
                    }

                    // If we get here, we have read the most recently added data.
                    // We then check to see if the writer has finished producing data.
                    if (q.completed)
                        return false;

                    // If we get here there is no data waiting, and no flagging of the completed thread.
                    // Wait a millisecond. The system will also context switch. 
                    // This will allow the writing thread some additional resources to pump out 
                    // more data (especially if it iself is multithreaded)
                    Thread.Sleep(1);
                }
            }
        }

        /// Returns the number of nodes currently used.
        private int NodeCount
        {
            get
            {
                int result = 0;
                Node cur = null;
                Interlocked.Exchange<Node>(ref cur, remover.node);

                // Counts all nodes from the remover to the adder
                // Not efficient, but this is not called often. 
                while (cur != null)
                {
                    ++result;
                    Interlocked.Exchange<Node>(ref cur, cur.next);
                }
                return result;
            }
        }

        /// Construct the queue.
        public SimpleSharedQueue()
        {
            Node root = new Node();
            adder = new Cursor(this, root);
            remover = new Cursor(this, root);
        }

        /// Indicate to the reader that no more data is going to be written.
        public void MarkCompleted()
        {
            completed = true;
        }

        /// Read the next piece of data. Returns false if there is no more data. 
        public bool Read(ref T x)
        {
            return remover.Read(ref x);
        }

        /// Writes more data.
        public void Write(T x)
        {
            adder.Write(x);
        }

        /// Tells us if there are too many nodes, and can't add anymore.
        private bool IsFull()
        {
            return NodeCount == MAX_NODE_COUNT;  
        }
    }
}

解决方案

Microsoft Research CHESS should prove to be a good tool for testing your implementation.

阅读全文

相关推荐

最新文章