zl程序教程

您现在的位置是:首页 >  数据库

当前栏目

谈disruptor的单线程数据库操作

数据库 操作 单线程 Disruptor
2023-09-11 14:20:54 时间
对远程数据库的操作,采用disruptor能够很好解决死锁, 首先是定义一个抽象类,实现Runnable接口
public final static EventFactory TaskEvent EVENT_FACTORY = new EventFactory TaskEvent () { public TaskEvent newInstance() { return new TaskEvent(); public class TaskEventHandler implements EventHandler TaskEvent { //  执行接口函数onEvent执行 public void onEvent(TaskEvent event, long sequence, boolean endOfBatch) throws Exception { event.getTask().run(); import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import com.zhenhai.bonecp.CustomThreadFactory; import com.zhenhai.disruptor.BatchEventProcessor; import com.zhenhai.disruptor.RingBuffer; import com.zhenhai.disruptor.SequenceBarrier; import com.zhenhai.disruptor.YieldingWaitStrategy; import com.zhenhai.disruptor.dsl.ProducerType; *     使用方法 DisruptorHelper.initAndStart(); Task tt=new Taska(); DisruptorHelper.produce(tt); DisruptorHelper.shutdown(); public class DisruptorHelper { * ringbuffer容量,最好是2的N次方 private static final int BUFFER_SIZE = 1024 * 1; private static int group=2; private RingBuffer TaskEvent ringBuffer[]; private SequenceBarrier sequenceBarrier[]; private TaskEventHandler handler[]; private BatchEventProcessor TaskEvent batchEventProcessor[]; private  static DisruptorHelper instance; private static boolean inited = false; private static ScheduledExecutorService taskTimer=null; //JDK 创建一个使用单个 worker 线程的 Executor,以无界队列方式来运行该线程。 private    ExecutorService execute[]; //启动监视线程 static { System.out.println("init DisruptorHelper!!!!!!!!!!!!!!!!!"); instance = new DisruptorHelper(); instance.init(); inited = true; System.out.println("init DisruptorHelper end!!!!!!!!!!!!!!!!!"); * 静态类 * @return private DisruptorHelper(){ } * 初始化 private void init(){ execute=new ExecutorService[group]; ringBuffer=new RingBuffer[group]; sequenceBarrier=new SequenceBarrier[group]; handler=new TaskEventHandler[group]; batchEventProcessor=new BatchEventProcessor[group]; ////////////////定时执行//////////////// //初始化ringbuffer,存放Event for(int i=0;i group;i++){ ringBuffer[i] = RingBuffer.create(ProducerType.SINGLE, TaskEvent.EVENT_FACTORY, BUFFER_SIZE, new YieldingWaitStrategy()); sequenceBarrier[i] = ringBuffer[i].newBarrier(); handler[i] = new TaskEventHandler(); batchEventProcessor[i] = new BatchEventProcessor TaskEvent (ringBuffer[i], sequenceBarrier[i], handler[i]); ringBuffer[i].addGatingSequences(batchEventProcessor[i].getSequence()); execute[i]= Executors.newSingleThreadExecutor(); execute[i].submit(instance.batchEventProcessor[i]); this.taskTimer =  Executors.newScheduledThreadPool(10, new CustomThreadFactory("DisruptorHelper-scheduler", true)); inited = true; * 执行定时器 * @param tk private void produce(int index,Task tk){ //System.out.println("index:="+index); if(index 0||index =group) { System.out.println("out of group index:="+index); return; // if capacity less than 10%, dont use ringbuffer anymore System.out.println("capacity:="+ringBuffer[index].remainingCapacity()); if(ringBuffer[index].remainingCapacity() BUFFER_SIZE * 0.1) { System.out.println("disruptor:ringbuffer avaliable capacity is less than 10 %"); // do something }else { long sequence = ringBuffer[index].next(); //将状态报告存入ringBuffer的该序列号中 ringBuffer[index].get(sequence).setTask(tk); //通知消费者该资源可以消费 ringBuffer[index].publish(sequence); * 获得容器的capacity的数量 * @param index * @return private long  remainingcapacity(int index){ //System.out.println("index:="+index); if(index 0||index =group) { System.out.println("out of group index:="+index); return 0L; long capacity= ringBuffer[index].remainingCapacity(); return capacity; private void shutdown0(){ for(int i=0;i group;i++){ execute[i].shutdown(); ////////////////////////////////下面是静态方法提供调用//////////////////////////////////////////////////////// * 直接消费 * @param tk public static void addTask(int priority,Task tk){ instance.produce(priority,tk); * 定时消费 * @param tk * @param delay * @param period public static void scheduleTask(int priority,Task tk,long delay,long period){ Runnable timerTask = new ScheduledTask(priority, tk); taskTimer.scheduleAtFixedRate(timerTask, delay, period, TimeUnit.MILLISECONDS); * 定点执行 * @param tk * @param hourse * @param minus * @param sec * @return public static Runnable scheduleTask(int priority,Task tk, int hourse,int minus,int sec) Runnable timerTask = new ScheduledTask(priority, tk); //每天2:30分执行 long delay = Helper.calcDelay(hourse,minus,sec); long period = Helper.ONE_DAY; System.out.println("delay:"+(delay/1000)+"secs"); taskTimer.scheduleAtFixedRate(timerTask, delay, period, TimeUnit.MILLISECONDS); return timerTask; //对定时执行的程序进行分装 private static class ScheduledTask implements Runnable private int priority; private Task task; ScheduledTask(int priority, Task task) this.priority = priority; this.task = task; public void run() instance.produce(priority,task); }catch(Exception e){ System.out.println("catch exception in DisruptorHelper!"); public static long getRemainingCapatiye(int index){ return instance.getRemainingCapatiye(index); public static void shutdown(){ if(!inited){ throw new RuntimeException("Disruptor还没有初始化!"); instance.shutdown0(); 最新内容请见作者的GitHub页:http://qaseven.github.io/
高性能无锁并发框架Disruptor,太强了 Disruptor是一个开源框架,研发的初衷是为了解决高并发下队列锁的问题,最早由LMAX提出并使用,能够在无锁的情况下实现队列的并发操作,并号称能够在一个线程里每秒处理6百万笔订单
高性能无锁并发框架Disruptor,太强了! Disruptor是一个开源框架,研发的初衷是为了解决高并发下队列锁的问题,最早由LMAX提出并使用,能够在无锁的情况下实现队列的并发操作,并号称能够在一个线程里每秒处理6百万笔订单官网:lmax-exchange.github.io/disruptor/目前,包括Apache Storm、Camel、Log4j2在内的很多知名项目都应用了Disruptor以获取高性能为什么会产生Disruptor框架「目前Java内置队列保证线程安全的方式:」ArrayBlockingQueue:基于数组形式的队列,通过加锁的方式,来保证多线程情况下数据的安全;LinkedBlockingQue基于链表形式
【高并发】线程的执行顺序没你想的那么简单 调用Thread的start()方法启动线程时,线程的执行顺序是不确定的。也就是说,在同一个方法中,连续创建多个线程后,调用线程的start()方法的顺序并不能决定线程的执行顺序。
并发与并行 同步或异步 我们都知道,程序猿是一种逻辑性极强的生物,他们不擅言辞,不擅表达,但是他们能够用一种神秘的语言与机器进行沟通,知道怎么让机器听他们的。
高并发之——线程的执行顺序 调用Thread的start()方法启动线程时,线程的执行顺序是不确定的。也就是说,在同一个方法中,连续创建多个线程后,调用线程的start()方法的顺序并不能决定线程的执行顺序。