与有序输入Parallel.ForEach?Parallel、ForEach

由网友(滿目星河)分享简介:我在程序中使用的 Parallel.ForEach()语句。它使用了一些对象作为输入的列表。我不关心输出的命令,但我需要这个环带以相同的顺序输入元件在输入列表中。是否有可能与 Parallel.ForEach()实现这一点?I have a Parallel.ForEach() statement used in m...

我在程序中使用的 Parallel.ForEach()语句。它使用了一些对象作为输入的列表。我不关心输出的命令,但我需要这个环带以相同的顺序输入元件在输入列表中。是否有可能与 Parallel.ForEach()实现这一点

I have a Parallel.ForEach() statement used in my program. It uses a list of some objects as input. I don't care about output order, but I need this loop to take input elements in the same order as in the input list. Is it possible to achieve this with Parallel.ForEach()?

推荐答案

在你从你的的IEnumerable&LT需要preserve秩序的情况下,T> ,你可能会想实施的 OrderablePartitioner< T> 品种。这个例子code这个类包含一个简单的例子,检索它们逐一从枚举时间递增的顺序。

In the case where you need to preserve order from your IEnumerable<T>, you'll likely want to implement a custom partitioner of the OrderablePartitioner<T> variety. The example code for this class includes a simple example which retrieves them one at a time from the enumeration in increasing order.

不过,这是很多的工作的一些东西,可能是一个简单的生产者 - 消费者模型类似的 ConcurrentQueue&LT; T&GT;

However, this is a lot of work for something which could be a simple producer-consumer model with something like ConcurrentQueue<T>:

var queue = new ConcurrentQueue<X>(yourEnumerableOfX);
Action consumer = () =>
{
    X x;
    while (queue.TryDequeue(out x))
    {
        x.Frob();
    }
};

// At most N "in flight"
int maxParallelism = Environment.ProcessorCount;
var consumers = Enumerable.Repeat(consumer, maxParallelism).ToArray();
Parallel.Invoke(consumers);

使用这个code,你会得到保证先入先出的行为,你的请求最终处理飞行近的顺序被接收。一旦放置在平行的,你就没有保证他们留在顺序。

Using this code you'll be guaranteed First-In-First-Out behavior, and that your requests end up processed "in flight" nearly in the order they are received. Once placed in parallel, you'll have no guarantee they stay in sequential order.

另外,您也可以使用以下(与队列项目的数量保持固定的限制):

Alternatively, you can use the following (with the restriction that the number of queue items stays fixed):

// Executes exactly queue.Count iterations at the time of Parallel.ForEach
// due to "snapshot" isolation of ConcurrentQueue<X>.GetEnumerator()
var queue = new ConcurrentQueue<X>(yourEnumerableOfX);
Parallel.ForEach(
    queue,
    _ =>
    {
        X x;
        if (queue.TryDequeue(out x))
        {
            x.Frob();
        }
    });

如果您想继续生产在一个线程中,与消费在其他国家使用的 BlockingCollection&LT; T&GT; 与队列作为其背后的集合:

If you would like to keep on producing in one thread, and consuming in others use a BlockingCollection<T> with a queue as its backing collection:

var queue = new BlockingCollection<X>(new ConcurrentQueue<X>());

// add to it
Task.Factory.StartNew( () =>
    {
         foreach (var x in yourEnumerableOfX)
         {
             queue.Add(x);
             Thread.Sleep(200);
         }

         // Signal to our consumers we're done:
         queue.CompleteAdding();
    });

现在我们需要'无界'的消费者,因为我们不能确定到底有多少队列中的项目可能是present:

Now we need 'unbounded' consumers since we are unsure exactly how many queue items may be present:

// Roughly the same consumer code as above, but 'unbounded'
Action consumer = () =>
{
    while (!queue.IsCompleted)
    {
        X x;
        try
        {
            // blocking form, switch to TryTake and maybe Thread.Sleep()
            x = queue.Take();
        }
        catch (InvalidOperationException)
        {
            // none left
            break;
        }

        x.Frob();
    }
};

int maxParallelism = Environment.ProcessorCount;
var consumers = Enumerable.Repeat(consumer, maxParallelism).ToArray();
Parallel.Invoke(consumers);
阅读全文

相关推荐

最新文章