这篇是并发集合中的最后一篇,介绍一下BlockingCollection。在工作中我还没有使用过,但是看上去应该是为了便捷使用并发集合而创建的类型。默认情况下,BlockingCollection使用的是ConcurrentQueue容器,当然我们也可以使用其他实现了IProducerConsumerCollection的类型来操作。

static Task GetRandomDelay()
{
int delay = new Random(DateTime.Now.Millisecond).Next(1, 500);
return Task.Delay(delay);
}

class CustomTask
{
public int Id { get; set; }
}

static async Task RunProgram(IProducerConsumerCollection<CustomTask> collection = null)
{
var taskCollection = new BlockingCollection<CustomTask>();
if(collection != null)
taskCollection= new BlockingCollection<CustomTask>(collection);

var taskSource = Task.Run(() => TaskProducer(taskCollection));

Task[] processors = new Task[4];
for (int i = 1; i <= 4; i++)
{
string processorId = "Processor " + i;
processors[i - 1] = Task.Run(() => TaskProcessor(taskCollection, processorId));
}

await taskSource;

await Task.WhenAll(processors);
}

static async Task TaskProducer(BlockingCollection<CustomTask> collection)
{
for (int i = 1; i <= 20; i++)
{
await Task.Delay(20);
var workItem = new CustomTask { Id = i };
collection.Add(workItem);
Console.WriteLine("Task {0} has been posted", workItem.Id);
}
collection.CompleteAdding(); //完成工作
}

static async Task TaskProcessor(BlockingCollection<CustomTask> collection, string name)
{
await GetRandomDelay();
foreach (CustomTask item in collection.GetConsumingEnumerable())
{
Console.WriteLine("Task {0} has been processed by {1}", item.Id, name);
await GetRandomDelay();
}
}

首先调用默认的BlockingCollection:



然后我们传入一个ConcurrentStack实例

Console.WriteLine("Using a Stack inside of BlockingCollection");
Console.WriteLine();
Task t = RunProgram(new ConcurrentStack<CustomTask>());
t.Wait();



Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐