zl程序教程

您现在的位置是:首页 >  其他

当前栏目

SSE 第二篇

2023-02-19 12:19:44 时间

上一篇文章只是简单帮大家梳理一下什么是SSE。这篇文章,则会放上真实Spring框架对SSE的封装了。框架封装了send方法,我们可以通过业务主动去给客户端推送事件。

我本来考虑实现服务器宕机重启后,SSE请求对象保持原有不变,实现前端SSE重连。但是经过实际操作,以及思考后,我发现此方案不能解决此问题。因为响应对象存储在服务端的JUC包下的Map中。我们无法通过Redis存储信息,然后重新获取原来的响应对象。SSE在服务端的响应对象与Session机制类似。也就是无法跨服务使用!所以,我们压根就不用考虑这个问题。这对于SSE来说是个伪需求!

先放上前端代码

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>SSE,服务器主动触发消息</title>
</head>
<body>
<h1>开发者模式,查看控制台、以及网络可以查看相关输出</h1>
消息类型是message消息:
<div id="ssediv">默认消息</div>
<br>
消息类型是diyEventType消息:
<div id="diyssediv">DIY SSE消息</div>
<br>
<br>
<br>
<br>
<div id="diybutton">
    <button type="button" onclick="connectSSE()">主动连接SSE服务器!</button>
    <button type="button" onclick="closeSSE()">关闭SSE连接!</button>
    <button type="button" onclick="diyclick()">点我模拟服务器发送消息!</button>
</div>
</body>
<script>
    var sse = new EventSource("http://localhost:8089/sse-plus");

    /**
     * 默认是没有指定eventTtpe的消息,但eventType就为message。
     * 等价于addEventListener("message" ...
     */
    sse.onmessage = function (ev) {
        console.info("这里只能处理eventType为message的消息")
        var elementById = document.getElementById("ssediv");
        elementById.innerHTML = ev.data;
    }

    /**
     * 添加指定类型消息处理,eventType是后台自定义的
     */
    sse.addEventListener("diyEventType", event => {
        console.info("自定义事件" + event.data)
        var elementById2 = document.getElementById("diyssediv");
        elementById2.innerHTML = event.data;
    })

    /**
     * SSE连接异常
     */
    sse.onerror = function () {
        alert("服务器已停止!")
    }

    /**
     * SSE连接成功
     */
    sse.onopen = function () {
        alert("服务器已连接!")
    }

    // 不要忘记关闭断开连接哦
    // sse.close()
</script>
<script>
    function diyclick() {
        var xmlHttpRequest = new XMLHttpRequest();
        xmlHttpRequest.open("get", "http://localhost:8089/sendMessage")
        xmlHttpRequest.send()
    }

    // 关闭SSE
    function closeSSE() {
        sse.close()
        console.info("服务器已关闭!")
    }

    // 连接SSE
    function connectSSE() {
        sse.close()
        sse = new EventSource("http://localhost:8089/sse-plus");
        sse.addEventListener("diyEventType", event => {
            console.info("自定义事件" + event.data)
            var elementById2 = document.getElementById("diyssediv");
            elementById2.innerHTML = event.data;
        })
        /**
         * 默认是没有指定eventTtpe的消息,但eventType就为message。
         * 等价于addEventListener("message" ...
         */
        sse.onmessage = function (ev) {
            console.info("这里只能处理eventType为message的消息")
            var elementById = document.getElementById("ssediv");
            elementById.innerHTML = ev.data;
        }
        /**
         * SSE连接异常
         */
        sse.onerror = function () {
            console.info("服务器已停止!")
        }

        /**
         * SSE连接成功
         */
        sse.onopen = function () {
            console.info("服务器已连接!")
        }
    }
</script>
</html>

粘贴Java代码

    private static Integer sendTimes = 0;
    private static Map<String, SseEmitter> sseCache = new ConcurrentHashMap<>();

    @GetMapping(value = "/sse-plus")
    @ResponseBody
    public SseEmitter SseEmitter(HttpServletResponse response) {
        // 设置超时时间,0表示不过期。默认30秒,超过时间未完成会抛出异常,需在全局异常捕获:AsyncRequestTimeoutException
        SseEmitter sseEmitter = new SseEmitter(0L); // 单位ms,如果你设定了,会自动断开。如果前端有自动重试机制,间歇断开可减少连接被长久占用。
        response.setContentType("text/event-stream");   // 指定ContentType,不可变
        response.setCharacterEncoding("utf-8");         // 指定响应字符集,不可变,经测试非UTF-8则会中文乱码,但建议指定utf-8
        String clientId = UUID.randomUUID().toString();
        // 注册回调
        //  >> 回调1:长链接完成后回调接口(即关闭连接时调用)
        sseEmitter.onCompletion(() -> {
            sseCache.remove(clientId);
            log.info("SSE onCompletion: {}连接关闭时触发", clientId);
        });
        //  >> 回调2:出现异常会调用此方法
        sseEmitter.onError(new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) {
                sseCache.remove(clientId);
                log.info("SSE onError:{}出现异常", clientId);
            }
        });
        //  >> 回调3:出现连接超时,会调用此方法
        sseEmitter.onTimeout(() -> {
            sseCache.remove(clientId);
            log.error("SSE onTimeout:{}超时了", clientId);
        });
        sseCache.put(clientId, sseEmitter);
        log.info("创建新的sse连接,当前用户:{}", clientId);
        try {
            sseEmitter.send(SseEmitter.event().id(clientId).name("diyEventType").data("连接成功" + clientId));
        } catch (IOException e) {
            log.error("SSE: 给客户端发送消息异常,客户端ID:{}", clientId, e);
            throw new RuntimeException("给客户端发送消息异常!", e);
        }
        return sseEmitter;
    }

    /**
     * 长链接完成后回调接口(即关闭连接时调用)
     *
     * @param clientId 客户端ID
     **/
    private Runnable completionCallBack(String clientId) {
        return () -> {
            log.info("结束连接:{}", clientId);
        };
    }

    // 创建一个线程池,用于处理大批量用户掉线的问题
    private static ExecutorService executorService = new ThreadPoolExecutor(3,
            5,
            60L,
            TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(50),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.CallerRunsPolicy()); // 采用阻塞队列。不丢任何消息

    /**
     * 缺陷:此处发送消息时候,如果用户连接失效,服务器实际无法感知,只能通过再次调用send出现异常时候来判断用户已经断联
     * 如果采用重试机制,很容易造成阻塞。如果用户体量很大,建议采用MQ的方式,将消息甩到MQ。剩下由其他服务或线程来处理。
     *
     * @return
     */
    @GetMapping(value = "/sendMessage")
    @ResponseBody
    public String sendMessage() {
        TimeInterval timer = DateUtil.timer();
        sendTimes++;
        // 这里从缓存中拿到sse对象,调用send方法实现主动推送
        sseCache.forEach((clientId, sseEmitter) -> {
            try {
                // 建议每次发送都以新Obj:此对象是builder,调用一次,相当于append一次!
                SseEmitter.SseEventBuilder data = SseEmitter.event()
                        .reconnectTime(5000)
                        .name("diyEventType");
                // 直接调用对象的send方法就可以,自定义主动推送消息了
                data.id(clientId)
                        .data("宝贝Aa1? " + clientId + " : " + sendTimes, MediaType.APPLICATION_JSON)
                        .data("</br>当前连接用户数: " + sseCache.size())
                        .data("</br>发送本次耗时:" + timer.interval());
                sseEmitter.send(data);
            } catch (IOException e) {
                executorService.execute(() -> {
                    // 推送消息失败后,每隔3s推送一次,推送5次。如果不使用线程池,就会导致发消息时,重试机制导致其他用户消息无法处理!
                    for (int i = 0; i < 5; i++) {
                        try {
                            Thread.sleep(3000);
                            if (sseEmitter == null) {
                                log.error("消息推送出现异常:{}的第{}次消息重推失败,未创建长链接", clientId, i + 1);
                                continue;
                            }
                            // 这里data非全局消息,想办法抽离出去即可 sseEmitter.send(data);
                        } catch (Exception ex) {
                            log.error("消息推送出现异常:{}的第{}次消息重推失败", clientId, i + 1, ex);
                            continue;
                        }
                        if (i == 4) {
                            sseCache.remove(clientId);
                            log.error("由于用户{},消息推送老是失败,则不再尝试推送消息!", clientId);
                        }
                        //log.info("消息推送出现异常:{}的第{}次消息重推成功,{}", clientId, i + 1, data);
                        return;
                    }
                });

            }
        });
        return "消息推送成功!";
    }

特殊说明: 以上文章,均是我实际操作,写出来的笔记资料,不会盗用别人文章!烦请各位,请勿直接盗用!转载记得标注来源!