深入多线程之:深入生产者、消费者队列分析
上次我们使用AutoResetEvent实现了一个生产/消费者队列。这一次我们要使用Wait和Pulse方法来实现一个更强大的版本,它允许多个消费者,每一个消费者都在自己的线程中运行。
我们使用数组来跟踪线程。
通过跟踪线程可以让我们在所有的线程都结束后再结束我们的队列任务。
每一个消费者线程都执行一个叫做Consume的方法,在一个for循环中,我们可以创建和启动线程。例如:
publicPCQueue(intworkerCount)
{
_workers=newThread[workerCount];
for(inti=0;i<workerCount;i++)
(_workers[i]=newThread(Consume)).Start();
}
上次我们使用的是一个字符串来代表任务,这次我们使用Action委托,它的定义如下:
为了表示一系列的任务,我们使用Queue<T>集合,例如:
在我们调用生产(EnqueueItem)和消费(Consume)方法前,还是完整的看一看代码吧:
classPCQueue
{
readonlyobject_locker=newobject();
Thread[]_workers;
Queue<Action>_itemQ=newQueue<Action>();//保存任务的队列
publicPCQueue(intworkerCount)
{
_workers=newThread[workerCount];
for(inti=0;i<workerCount;i++)
(_workers[i]=newThread(Consume)).Start();
}
publicvoidShutdown(boolwaitForWorkers)
{
//为每一个线程插入一个nullitem,可以是每一个worker退出
foreach(Threadworkerin_workers)
EnqueueItem(null);
//等待所有的线程退出。
if(waitForWorkers)
foreach(Threadworkerin_workers)
worker.Join();
}
publicvoidEnqueueItem(Actionitem)
{
lock(_locker)
{
_itemQ.Enqueue(item);
Monitor.Pulse(_locker);//通知等待队列中的线程
}
}
voidConsume()
{
while(true)
{
Actionitem;
lock(_locker)
{
while(_itemQ.Count==0)
{
Monitor.Wait(_locker);//释放锁,并阻止当前线程,直到其他线程发送pulse信号。 }
item=_itemQ.Dequeue();
}
if(item==null)return;//退出的信号
item();
}
}
}
我们可以有一个退出策略,插入一个nullitem作为consumer退出的信号。如果我们想要快速的退出,可以使用一个独立的”cancel”标记,因为我们支持多个consumers,所以我们必须为每一个consumer插入一个nullitem。
下面是Main方法。
publicstaticvoidMain()
{
PCQueueq=newPCQueue(2);
Console.WriteLine("Enqueuing10items...");
for(inti=0;i<10;i++)
{
intitemNumber=i;
q.EnqueueItem(()=>
{
Thread.Sleep(1000);//模拟耗时的工作
Console.WriteLine("Task"+itemNumber);
});
}
q.Shutdown(true);//等待关闭
Console.WriteLine();
Console.WriteLine("Workerscomplete!");
}
下面让我们细致的看一看EnqueueItem方法:
publicvoidEnqueueItem(Actionitem)
{
lock(_locker)
{
_itemQ.Enqueue(item);
Monitor.Pulse(_locker);//通知等待队列中的线程
}
}
因为我们的队列_itemQ被多线程环境使用,因此在对_itemQ进行读取的时候需要加锁lock.
因为我们插入了一个新的任务,我们必须修改阻塞条件,也就是调用pulse方法,来唤醒调用了wait方法的线程。
出于对效率的考虑,当插入一个Item的时候使用Pulse来代替PulseAll方法,因为大部分时候每一个Item只需要一个consumer来执行。如果你有一个冰淇淋,你不可能叫30个睡眠的孩子都起来吃它,同样,对于一个item,同时唤醒30个consumers一点好处都没有。
我们希望当没什么事情做的时候,线程阻塞就可以了,换句话说,队列中没有item的时候,线程就应该阻塞。因此我们的阻塞条件是_itemQ.Count==0;
Actionitem;
lock(_locker)
{
while(_itemQ.Count==0)
{
Monitor.Wait(_locker);//释放锁,并阻止当前线程,直到其他线程发送pulse信号。 }
item=_itemQ.Dequeue();
}
if(item==null)return;//退出的信号
item();
while循环退出的时候也意味着_itemQ至少有一个item。我们必须在释放锁之前调用你哦个dequeue方法来获取item,考虑下下面的代码:
lock(_locker)
{
while(_itemQ.Count==0)
{
Monitor.Wait(_locker);//释放锁,并阻止当前线程,直到其他线程发送pulse信号。 }
}
//现在在这里可能被抢占,_itemQ可能被修改
lock(_locker)
{
item=_itemQ.Dequeue();
}
在item被Dequeued后,我们就应该立即释放锁了,如果我们在执行task的时候,一直持有锁,则会没有必要的阻塞其他线程来获取任务。
在调用Wait方法的时候可以传递一个毫秒或Timespan的时间来设置超时。如果Wait超时了,那么Wait方法就会返回false。
带有超时功能的Wait方法的主要步骤:
阻塞直到pulsed或者超时。
重新获取锁。
超时就好像CLR在超时到了的时候自动的调用了pulse方法一样。
下面是使用超时的Wait的主要代码:
lock(_locker)
while(<阻塞条件>)
Monitor.Wait(_locker,<超时时间>);
Monitor.Wait方法返回一个bool值来代表是调用了pulse还是已经超时了。
这对记录日志很有用。
相关文章
- python deepcopy函数实现_python 多线程
- java多线程并发之旅-14-lock free queue 无锁队列[通俗易懂]
- 多线程CAS机制(图解)
- 16. 多线程案例(4)——线程池
- Java多线程
- PyQt5渐变圆环水波进度条+透明淡入(多线程信号)
- Python多线程结合队列下载百度音乐代码详解编程语言
- python 多线程优先队列Queue详解编程语言
- Linux下的多线程编程之旅(linux多线程)
- Linux下多线程聊天:提高工作效率还能缓解寂寞(linux多线程聊天)
- 深入理解Redis多线程技术(怎么理解redis多线程)
- 多线程智能化管理Redis队列(多线程redis队列)
- 利用多线程操纵Redis数据库(多线程redis操作)
- Redis过期极速多线程为您服务(redis过期 多线程)
- 使用Redis和多线程有效预防过期(redis过期 多线程)
- java多线程和并发包入门示例
- Python中多线程thread与threading的实现方法