zl程序教程

您现在的位置是:首页 >  后端

当前栏目

Java并发编程实战之基于生产者消费者模式的日志服务读书笔记

2023-09-14 09:06:43 时间

日常开发中,我们会经常跟日志系统打交道,日志系统算是典型的生产者消费者模型的系统。(多个不同的)生产者线程负责生产日志消息,这些系统扮演者生产者的角色,日志系统负责将消息队列里的日志上传到服务器,属于消费者角色。是一种多生产者单消费者模式的设计方式。如果生产者的生产速度大于LoggerThread 的消费速度,则BlockingDeque线程会阻塞生产者,直到LoggerThread 有能力处理新的消息。

不支持关闭的生产者-消费者日志服务(bug版本)

public class LogService {
    private final BlockingQueue<String> queue;
    private final LoggerThread loggerThread;

    public LogService() {
        //注意队列的容量为10
        queue = new LinkedBlockingQueue<>(10);
        loggerThread = new LoggerThread();
    }
    public void start() {
        loggerThread.start();
    }

    /**
     * 生产者生产消息,每个log的调用者都行相当于一个生产者
     *
     * @param msg
     * @throws InterruptedException
     */
    public void log(String msg) throws InterruptedException {
        queue.put(msg);
    }

    /**
     * 消费者线程消费消息,该线程扮演的是消费者这一角色。
     */
    private class LoggerThread extends Thread {

       public  LoggerThread(){
        	
        }
        public void run() {
            try {
                while (true) {//无限循环
                   System.out.println(queue.take());
                }//end while
            } catch (InterruptedException e) {//在循环外响应中断
            } finally {
            	System.out.println("log service is close");
            }
        }
      }
    }

上面代码看起来很正常,但是有如下缺点:
1、LoggerThread没有提供关闭的方法,当没有生产者生产消息的时候,那么LoggerThread在调用queue.take()的时候会一直处于阻塞状态,导致JVM没法正常关闭。比如下面的调用代码,就会发生LoggerThread无法关闭的情况。

 public static void main(String[] args) {
    	LogService logService = new LogService();
    
    	for(int i=0;i<100;i++) {
    		final int index =i;
    		new Thread(new Runnable() {

				@Override
				public void run() {
				
					try {
						logService.log("生产者生产消息===="+index);
					} catch (InterruptedException e) {
						System.out.println("response interception");
					}
					
				}
    			
    		}).start();
    	}
    	
    	logService.start();
    }

关闭LoggerThread很容易啊,只需要把LoggerThread的方法改成如下就可以了吧?

 public void run() {
            try {
                while (!queue.isEmpty()) {//判断队列是否为空,如果为空则关闭
                   System.out.println(queue.take());
                }
            } catch (InterruptedException e) {
            	System.out.println("response interception");
            } finally {
            	System.out.println("log service is close="+queue.size());
            }
        }

上面的改动中,我们将while(true)改成了while(!queue.isEmpty()).可以吗?答案是完全不可以!在我们这个demo中,队列的大小是有限制的,如果消费者的消费速度大于生产者的速度,那么队列在某一瞬间就变成空了,也就是queue.isEmpty()为true,此时消费者关闭,但是生产者可以继续生产,则队列满的情况下生产者会阻塞。取消一个生产者-消费者模型的最可靠的方法是需要同时取消生产者和消费者,因为当单独的关闭消费者的时候,会阻塞生产者;当单独的关闭生产者的时候,会阻塞消费者。其关闭原则应该是:我们应该提供一个Stop方法,调用stop方法的时候表示日志服务已经关闭,此时生产者不可以再生产消息,当消费者消费完队列里的消息后才正式关闭日志服务,避免造成日志的丢失

且看下面代码,我们为LogService提供了stop方法,并且修改了log方法,当isShutDown为true的时候,就不能在生产消息。然而这看似完美的操作却有个巨大漏洞,stop方法不是原子的,if(!isShutdown)也不是原子的,使得关闭方法并不可靠

  private boolean isShutdown=false;
    public void stop(){
        isShutdown=true;
    }
    public void log(String msg) throws InterruptedException {
        if(!isShutdown){
            queue.put(msg);
        }
    }

所以我们需要改造。使之成为原子的即可。我们改造如下

支持关闭的生产者-消费者日志服务(不安全版本)

使用synchronized关键字,将stop改造成原子的,需要注意的是,因为put方法本身就可以阻塞,所以我们不需要在消息加入队列的时候再去持有一个锁,所以我们将put方法放在了synchronized语句块的外面。


    private boolean isShutDown = false;
    public void stop(){
    	synchronized(this) { isShutDown=true;}
    }

    /**
     * 生产者生产消息,每个log的调用者都行相当于一个生产者
     *
     * @param msg
     * @throws InterruptedException
     */
    public void log(String msg) throws InterruptedException {
    	synchronized(this) {
    		if(isShutDown) {
    			return;
    		}
    	}
    	queue.put(msg);   	
    }

经过这么以改进,当我们调用stop的时候,则生产者线程不会继续生产消息。我们可以将LoggerThread改造如下:

 public void run() {
     try {
          while (true) {
          	if(isShutDown) {//如果关闭则退出
          		break;
          	}
             System.out.println(queue.take());
          }
      } catch (InterruptedException e) {//注意我们这里实在while循环外响应中断的。
      	System.out.println("response interception");
      } finally {
      	System.out.println("log service is close="+queue.size());
      }
  }

那么这样就没问题了吗?答案是问题大了去了。比如我们调用了stop方法,此时LoggerThread消费者正好在queue.take()阻塞着,那么if(isShutDown) {break}则永远不会执行,LoggerThread是无法退出的。正确的做法是我们需要再次对stop方法进行改造,改造如下:

    public void stop(){
    	synchronized(this) { isShutDown=true;}
    	loggerThread.interrupt();
    }

我们添加了loggerThread.interrupt();,此时take方法发生阻塞时会响应中断,因为我们在while循环之外响应中断的,则跳出while循环,执行打印如下:

  public static void main(String[] args) {
    	LogService logService = new LogService();
    
    	for(int i=0;i<90;i++) {
    		final int index =i;
    		Thread thread=new Thread(new Runnable() {
				@Override
				public void run() {				
					try {				
						logService.log("生产者生产消息===="+index);	
					} catch (InterruptedException e) {
						System.out.println("response interception");
					}				
				}		
    		});
    		thread.start();
    	}
    	
    	logService.start();
    	try {
			Thread.sleep(2000);
		} catch (InterruptedException e) {		
		}
		//两秒之后关闭日志系统
    	logService.stop();
    }

执行打印如下:

response interception
log service is close=0

那么到此为止是不是就没问题了呢?当然仍然有问题,如果发生中断的时候,我们在while循环之外响应中断,消息队列中仍然有消息,我们就没法继续处理剩下的日志,那么日志服务的关闭势必造成日志的丢失,如果是比较关键的日志信息丢失了,我们都没地方哭去。

所以我们的日志服务系统仍然有优化的空间。

支持关闭的生产者-消费者日志服务(安全版本)

我们的目标是生产者生产多少消息,我们的日志系统就处理多少消息。当日志服务关闭的时候,生产者不能生产消息。消费者在处理完所有消息后,自行关闭。
为此我们继续改造,使用一个变量logCount,每调用一次log则logCount+1;注意logCount递增操作需要是原子的。
为此我们对log方法进行改造,改造如下:

    private int logCount=0;
    public void log(String msg) throws InterruptedException {
    	synchronized(this) {
    		if(isShutDown) {
    			return;
    		}
    		//消息递增
    		logCount++;
    	}
    	queue.put(msg);
    	
    }

处理完了成产者,我们需要对消费者进行改造,注意LoggerThread之前的消费者捕获InterruptedException 时,是在while循环之外,我们现在需要放在循环之内。这样我们才能正确处理中断,确保队列里的消息处理完毕,改造后的LoggerThread如下:
在这里插入图片描述
到此为止,本篇博文就结束了,在读《Java并发编程实战》这一章节的时候,还有点疑问为什么这么写。直到写完了这篇博客,边写博客边敲程序进行验证,才算是彻底了解了其中的意义。果然写博客还是很有帮助的。起码这边博客对博主自己深入理解Java并发编程,有很大帮助。

完整的代码如下:

package log;

import java.io.PrintWriter;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;

public class LogService {
	private final BlockingQueue<String> queue;
	private final LoggerThread loggerThread;

	public LogService() {
		queue = new LinkedBlockingQueue<>(10);
		loggerThread = new LoggerThread();
	}

	public void start() {
		loggerThread.start();
	}

	private boolean isShutDown = false;

	public void stop() {
		synchronized (this) {
			isShutDown = true;
		}
		loggerThread.interrupt();
	}

	private int logCount = 0;

	public void log(String msg) throws InterruptedException {
		synchronized (this) {
			if (isShutDown) {
				return;
			}
			logCount++;
		}
		queue.put(msg);

	}

	/**
	 * 消费者线程消费消息,该线程扮演的是消费者这一角色。
	 */
	private class LoggerThread extends Thread {

		public LoggerThread() {

		}

		public void run() {
			try {

				while (true) {
					try {
						
						synchronized (LogService.this) {
							if (isShutDown && logCount == 0) {
								break;
							}
						}

						System.out.println(queue.take());
						
						synchronized (LogService.this) {
							logCount--;
						}
					} catch (InterruptedException e) {
						System.out.println("相应中断,while循环继续执行,除非满足退出条件");
					}

				}//end while

			} finally {
				//资源额释放
				System.out.println("log service is close=" + queue.size());
			}
		}
	}

	public static void main(String[] args) {
		LogService logService = new LogService();

		for (int i = 0; i < 90; i++) {
			final int index = i;
			Thread thread = new Thread(new Runnable() {

				@Override
				public void run() {

					try {
						logService.log("生产者生产消息====" + index);

					} catch (InterruptedException e) {
						System.out.println("response interception");
					}

				}

			});
			thread.start();
		}

		logService.start();
		try {
			Thread.sleep(2000);
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		logService.stop();
	}
}