System|网络|Naive SEDA实现
2023-03-15 22:01:51 时间
SEDA是流水线化的事件驱动模型,能够异步地执行服务。和直接用事件驱动模型相比,SEDA更加去中心化与模块化。之前看了SEDA的论文,干脆拿Java NIO自己手撸个玩具实现吧
架构
Accept使用最基本的Reactor模型同步处理,Read则先通过Reactor同步获得数据,然后数据通过SEDA异步处理。这里的数据必须同步获得,不然因为数据没有被读取,会产生大量重复异步事件(踩坑)
Event
public class Event {
enum Type{Read,Write,ReadRepsonse,WriteResponse};
public Type type;
public SelectionKey key;
public String Packet;
public Event(SelectionKey key,Type type){
this.type = type;
this.key = key;
}
}
懒得写OOP了,字段肯定不同。这里把所有类型的字段做了并集简易实现。
数据获取
if (key.isAcceptable()) {
accept(key);
} else if (key.isReadable()) {
SocketChannel socketChannel = (SocketChannel) key.channel();
// Clear out our read buffer so it's ready for new data
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
// Attempt to read off the channel
int numRead;
try {
numRead = socketChannel.read(readBuffer);
} catch (IOException ex) {
// The remote forcibly closed the connection, cancel
// the selection key and close the channel.
key.cancel();
socketChannel.close();
return;
}
String str = new String(readBuffer.array(), 0, numRead);
Event event = new Event(key, Event.Type.Read);
event.Packet = str;
System.out.println("Sync "+str);
StageMap.getInstance().stageMap.get("read").Enqueue(event);
}
获取数据后,向ReadStage(维护单例表进行查找)的阻塞队列发送包含数据的事件,这个过程是完全同步的,也就是正常的Reactor。
Stage
@Override
public void Enqueue(Event e){
synchronized (lock){
Runnable task;
BatchingQ.offer(e);
if(BatchingQ.size() == BatchSize){
ArrayList<Event> elist = new ArrayList<>();
try {
for (int i = 0; i < BatchSize; i++) {
elist.add(i,BatchingQ.poll());
}
}catch(Exception ex){
//do nothing
}
task = new HandleThread(elist);
pool.execute(task);
}
}
}
Stage继承相同的接口,实现Enqueue操作。当队列长度达到BatchSize时,从线程池抽出一个线程异步地处理Batching。到这里,数据的处理过程已经完全和Reactor负责的数据获取过程异步执行了。
class HandleThread implements Runnable{
ArrayList<Event> elist;
public HandleThread(ArrayList<Event> elist){
this.elist = elist;
}
@Override
public void run() {
try {
Event e;
for (int i = 0; i < BatchSize; i++) {
e = elist.get(i);
if(e.type == Event.Type.Read){
System.out.println("Async " + e.Packet);
Event event = new Event(e.key, Event.Type.ReadRepsonse);
event.Packet = e.Packet;
StageMap.getInstance().stageMap.get("app").Enqueue(event);
}
}
}catch(Exception e){
//do nothing
}
}
}
随后,ReadStage处理完Read事件(简单的Print)后发出ReadResponse事件,要求AppStage进一步处理。这就是事件驱动,不同stage之间完全是异步的,只有事件enqueue是同步的。
AppStage执行的操作也很简单,还是打印,毕竟就是个demo。
WriteStage就是NIO代码复制粘贴,send Client。
@Override
public void run() {
try {
Event e;
for (int i = 0; i < BatchSize; i++) {
e = elist.get(i);
if(e.type == Event.Type.Write){
System.out.println("Write " + e.Packet);
ByteBuffer sendBuffer = ByteBuffer.allocate(1024);
SocketChannel channel = (SocketChannel) e.key.channel();
sendBuffer.clear();
sendBuffer.put(e.Packet.getBytes());
sendBuffer.flip();
channel.write(sendBuffer);
Event event = new Event(e.key, Event.Type.WriteResponse);
StageMap.getInstance().stageMap.get("app").Enqueue(event);
}
}
}catch(Exception e){
//do nothing
}
}
结果
Batching
简单地把请求搁置到队列长度>5再进行批处理。
相关文章
- 美团二面:考我幻读,结果答的不好
- 浅谈深度学习:如何计算模型以及中间变量的显存占用大小
- 你的技术栈中不能少云数据库
- 面对千万级数据查询,CK、ES、RediSearch谁才是王炸?
- 我们一起聊聊 Oracle 的Lgwr Worker
- MySQL 不同隔离级别,都使用了什么锁?
- 256变4096:分库分表扩容如何实现平滑数据迁移?
- MySQL全文索引、支持中文分词
- LIMIT和OFFSET分页性能差!今天来介绍如何高性能分页
- GraphQL为何不是数据库查询方面的行业标准?
- 六个SQL查询技巧,你知道几个?
- 快速评估图数据库何时使用:与关系型数据库简要对比,离图更进一步
- SQL Server 备份的备份类型探究
- 为什么阿里巴巴禁止数据库中做多表join?
- 关系型、非关系型数据库存储选型盘点大全
- Redis 的数据被删除,内存占用还这么大?
- MySQL 崩溃恢复过程分析
- 什么?MySQL 8.0 会同时修改两个ib_logfilesN 文件?
- 程序员应知应会之数据库设计的那些事儿
- 知识图谱与图数据库的关系,终于有人讲明白了