zl程序教程

您现在的位置是:首页 >  数据库

当前栏目

System|网络|Naive SEDA实现

2023-03-15 22:01:51 时间

SEDA是流水线化的事件驱动模型,能够异步地执行服务。和直接用事件驱动模型相比,SEDA更加去中心化与模块化。之前看了SEDA的论文,干脆拿Java NIO自己手撸个玩具实现吧

架构

Accept使用最基本的Reactor模型同步处理,Read则先通过Reactor同步获得数据,然后数据通过SEDA异步处理。这里的数据必须同步获得,不然因为数据没有被读取,会产生大量重复异步事件(踩坑)

Event

public class Event {
    enum Type{Read,Write,ReadRepsonse,WriteResponse};
    public Type type;
    public SelectionKey key;
    public String Packet;
    public Event(SelectionKey key,Type type){
        this.type = type;
        this.key = key;
    }
}

懒得写OOP了,字段肯定不同。这里把所有类型的字段做了并集简易实现。

数据获取

                if (key.isAcceptable()) {
                    accept(key);
                } else if (key.isReadable()) {
                    SocketChannel socketChannel = (SocketChannel) key.channel();
                    // Clear out our read buffer so it's ready for new data
                    ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                    // Attempt to read off the channel
                    int numRead;
                    try {
                        numRead = socketChannel.read(readBuffer);
                    } catch (IOException ex) {
                        // The remote forcibly closed the connection, cancel
                        // the selection key and close the channel.
                        key.cancel();
                        socketChannel.close();
                        return;
                    }
                    String str = new String(readBuffer.array(), 0, numRead);
                    Event event = new Event(key, Event.Type.Read);
                    event.Packet = str;
                    System.out.println("Sync "+str);
                    StageMap.getInstance().stageMap.get("read").Enqueue(event);
                }

获取数据后,向ReadStage(维护单例表进行查找)的阻塞队列发送包含数据的事件,这个过程是完全同步的,也就是正常的Reactor。

Stage

@Override
    public void Enqueue(Event e){
        synchronized (lock){
            Runnable task;
            BatchingQ.offer(e);
            if(BatchingQ.size() == BatchSize){
                ArrayList<Event> elist = new ArrayList<>();
                try {
                    for (int i = 0; i < BatchSize; i++) {
                        elist.add(i,BatchingQ.poll());
                    }
                }catch(Exception ex){
                    //do nothing
                }
                task = new HandleThread(elist);
                pool.execute(task);
            }
        }
    }

Stage继承相同的接口,实现Enqueue操作。当队列长度达到BatchSize时,从线程池抽出一个线程异步地处理Batching。到这里,数据的处理过程已经完全和Reactor负责的数据获取过程异步执行了。

    class HandleThread implements Runnable{
        ArrayList<Event> elist;
        public HandleThread(ArrayList<Event> elist){
            this.elist = elist;
        }
        @Override
        public void run() {
            try {
                Event e;
                for (int i = 0; i < BatchSize; i++) {
                    e = elist.get(i);
                    if(e.type == Event.Type.Read){
                        System.out.println("Async " + e.Packet);
                        Event event = new Event(e.key, Event.Type.ReadRepsonse);
                        event.Packet = e.Packet;
                        StageMap.getInstance().stageMap.get("app").Enqueue(event);
                    }
                }
            }catch(Exception e){
                //do nothing
            }
        }
    }

随后,ReadStage处理完Read事件(简单的Print)后发出ReadResponse事件,要求AppStage进一步处理。这就是事件驱动,不同stage之间完全是异步的,只有事件enqueue是同步的。

AppStage执行的操作也很简单,还是打印,毕竟就是个demo。

WriteStage就是NIO代码复制粘贴,send Client。

        @Override
        public void run() {
            try {
                Event e;
                for (int i = 0; i < BatchSize; i++) {
                    e = elist.get(i);
                    if(e.type == Event.Type.Write){
                        System.out.println("Write " + e.Packet);
                        ByteBuffer sendBuffer = ByteBuffer.allocate(1024);
                        SocketChannel channel = (SocketChannel) e.key.channel();
                        sendBuffer.clear();
                        sendBuffer.put(e.Packet.getBytes());
                        sendBuffer.flip();
                        channel.write(sendBuffer);
                        Event event = new Event(e.key, Event.Type.WriteResponse);
                        StageMap.getInstance().stageMap.get("app").Enqueue(event);
                    }
                }
            }catch(Exception e){
                //do nothing
            }
        }

结果

Batching

简单地把请求搁置到队列长度>5再进行批处理。

代码见https://github.com/sjtuzwj/SEDA-Sample