spring websocket源码分析续Handler的使用
2023-09-11 14:21:40 时间
1. handler的定义
spring websocket支持的消息有以下几种:
对消息的处理就使用了Handler模式,抽象handler类AbstractWebSocketHandler.java
@Override public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception { if (message instanceof TextMessage) { handleTextMessage(session, (TextMessage) message); } else if (message instanceof BinaryMessage) { handleBinaryMessage(session, (BinaryMessage) message); } else if (message instanceof PongMessage) { handlePongMessage(session, (PongMessage) message); } else { throw new IllegalStateException("Unexpected WebSocket message type: " + message); } } protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception { } protected void handleBinaryMessage(WebSocketSession session, BinaryMessage message) throws Exception { } protected void handlePongMessage(WebSocketSession session, PongMessage message) throws Exception { }
具体实现handler类BinaryWebSocketHandler(为例,其它略)
public class BinaryWebSocketHandler extends AbstractWebSocketHandler { @Override protected void handleTextMessage(WebSocketSession session, TextMessage message) { try { session.close(CloseStatus.NOT_ACCEPTABLE.withReason("Text messages not supported")); } catch (IOException e) { // ignore } } }
2.handler的使用
StandardWebSocketClient和服务端握手时,调用
@Override protected ListenableFuture<WebSocketSession> doHandshakeInternal(WebSocketHandler webSocketHandler, HttpHeaders headers, final URI uri, List<String> protocols, List<WebSocketExtension> extensions, Map<String, Object> attributes) { int port = getPort(uri); InetSocketAddress localAddress = new InetSocketAddress(getLocalHost(), port); InetSocketAddress remoteAddress = new InetSocketAddress(uri.getHost(), port); final StandardWebSocketSession session = new StandardWebSocketSession(headers, attributes, localAddress, remoteAddress); final ClientEndpointConfig.Builder configBuilder = ClientEndpointConfig.Builder.create(); configBuilder.configurator(new StandardWebSocketClientConfigurator(headers)); configBuilder.preferredSubprotocols(protocols); configBuilder.extensions(adaptExtensions(extensions)); final Endpoint endpoint = new StandardWebSocketHandlerAdapter(webSocketHandler, session); Callable<WebSocketSession> connectTask = new Callable<WebSocketSession>() { @Override public WebSocketSession call() throws Exception { webSocketContainer.connectToServer(endpoint, configBuilder.build(), uri); return session; } }; if (this.taskExecutor != null) { return this.taskExecutor.submitListenable(connectTask); } else { ListenableFutureTask<WebSocketSession> task = new ListenableFutureTask<WebSocketSession>(connectTask); task.run(); return task; } }
红色部分调用一个适配器StandardWebSocketHandlerAdapter,它封装了Handler的调用
@Override public void onOpen(final javax.websocket.Session session, EndpointConfig config) { this.wsSession.initializeNativeSession(session); if (this.handler.supportsPartialMessages()) { session.addMessageHandler(new MessageHandler.Partial<String>() { @Override public void onMessage(String message, boolean isLast) { handleTextMessage(session, message, isLast); } }); session.addMessageHandler(new MessageHandler.Partial<ByteBuffer>() { @Override public void onMessage(ByteBuffer message, boolean isLast) { handleBinaryMessage(session, message, isLast); } }); } else { session.addMessageHandler(new MessageHandler.Whole<String>() { @Override public void onMessage(String message) { handleTextMessage(session, message, true); } }); session.addMessageHandler(new MessageHandler.Whole<ByteBuffer>() { @Override public void onMessage(ByteBuffer message) { handleBinaryMessage(session, message, true); } }); } session.addMessageHandler(new MessageHandler.Whole<javax.websocket.PongMessage>() { @Override public void onMessage(javax.websocket.PongMessage message) { handlePongMessage(session, message.getApplicationData()); } }); try { this.handler.afterConnectionEstablished(this.wsSession); } catch (Throwable t) { ExceptionWebSocketHandlerDecorator.tryCloseWithError(this.wsSession, t, logger); return; } }
具体实现
private void handleTextMessage(javax.websocket.Session session, String payload, boolean isLast) {
TextMessage textMessage = new TextMessage(payload, isLast);
try {
this.handler.handleMessage(this.wsSession, textMessage);
}
catch (Throwable t) {
ExceptionWebSocketHandlerDecorator.tryCloseWithError(this.wsSession, t, logger);
}
}
private void handleBinaryMessage(javax.websocket.Session session, ByteBuffer payload, boolean isLast) {
BinaryMessage binaryMessage = new BinaryMessage(payload, isLast);
try {
this.handler.handleMessage(this.wsSession, binaryMessage);
}
catch (Throwable t) {
ExceptionWebSocketHandlerDecorator.tryCloseWithError(this.wsSession, t, logger);
}
}
private void handlePongMessage(javax.websocket.Session session, ByteBuffer payload) {
PongMessage pongMessage = new PongMessage(payload);
try {
this.handler.handleMessage(this.wsSession, pongMessage);
}
catch (Throwable t) {
ExceptionWebSocketHandlerDecorator.tryCloseWithError(this.wsSession, t, logger);
}
相关文章
- 深入实践Spring Boot2.2.2 创建Redis服务类
- 深入实践Spring Boot3.2.5 分页查询控制器
- Spring源码之容器的功能扩展和refresh方法解析
- Spring Boot整合Spring Data JPA进行CRUD和模糊查询
- Spring核心IOC的源码分析
- Spring boot 2.0 版本报错 ResourceHttpRequestHandler cannot be cast to HandlerMethod
- Spring Boot实战:静态资源处理
- Mybatis MapperScannerConfigurer 自动扫描 将Mapper接口生成代理注入到Spring
- 《Spring Data实战》——1.2 主题概述
- 如何妙用Spring 数据绑定机制
- Spring Cloud Eureka集群 动态扩展新节点
- spring框架漏洞整理(Spring Framework漏洞)
- 深入理解Spring源码之声明式事务
- Spring源码 Gradle编译Spring源码
- 玩转spring boot——国际化
- spring源码分析之@Conditional
- spring源码分析之定时任务概述
- spring源码分析之spring注解@Aspect是如何工作的?
- spring websocket源码分析
- spring源码分析之spring-messaging模块详解
- spring beans源码解读之 ioc容器之始祖--DefaultListableBeanFactory
- spring beans源码解读之--BeanFactory进化史
- 曹工说Spring Boot源码(30)-- ConfigurationClassPostProcessor 实在太硬核了,为了了解它,我可能debug了快一天
- Spring加载Bean的流程(源码分析)
- Spring加载Bean的流程(源码分析)