【SpringBoot WEB 系列】SSE 服務器發送事件詳解


【SpringBoot WEB系列】SSE 服務器發送事件詳解

SSE 全稱Server Sent Event,直譯一下就是服務器發送事件,一般的項目開發中,用到的機會不多,可能很多小伙伴不太清楚這個東西,到底是干啥的,有啥用

本文主要知識點如下:

  • SSE 掃盲,應用場景分析
  • 借助異步請求實現 sse 功能,加深概念理解
  • 使用SseEmitter實現一個簡單的推送示例

I. SSE 掃盲

對於 sse 基礎概念比較清楚的可以跳過本節

1. 概念介紹

sse(Server Sent Event),直譯為服務器發送事件,顧名思義,也就是客戶端可以獲取到服務器發送的事件

我們常見的 http 交互方式是客戶端發起請求,服務端響應,然后一次請求完畢;但是在 sse 的場景下,客戶端發起請求,連接一直保持,服務端有數據就可以返回數據給客戶端,這個返回可以是多次間隔的方式

2. 特點分析

SSE 最大的特點,可以簡單規划為兩個

  • 長連接
  • 服務端可以向客戶端推送信息

了解 websocket 的小伙伴,可能也知道它也是長連接,可以推送信息,但是它們有一個明顯的區別

sse 是單通道,只能服務端向客戶端發消息;而 webscoket 是雙通道

那么為什么有了 webscoket 還要搞出一個 sse 呢?既然存在,必然有着它的優越之處

sse websocket
http 協議 獨立的 websocket 協議
輕量,使用簡單 相對復雜
默認支持斷線重連 需要自己實現斷線重連
文本傳輸 二進制傳輸
支持自定義發送的消息類型 -

3. 應用場景

從 sse 的特點出發,我們可以大致的判斷出它的應用場景,需要輪詢獲取服務端最新數據的 case 下,多半是可以用它的

比如顯示當前網站在線的實時人數,法幣匯率顯示當前實時匯率,電商大促的實時成交額等等...

II. 手動實現 sse 功能

sse 本身是有自己的一套玩法的,后面會進行說明,這一小節,則主要針對 sse 的兩個特點長連接 + 后端推送數據,如果讓我們自己來實現這樣的一個接口,可以怎么做?

1. 項目創建

借助 SpringBoot 2.2.1.RELEASE來創建一個用於演示的工程項目,核心的 xml 依賴如下

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.2.1.RELEASE</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    <java.version>1.8</java.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
</dependencies>

<build>
    <pluginManagement>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </pluginManagement>
</build>
<repositories>
    <repository>
        <id>spring-snapshots</id>
        <name>Spring Snapshots</name>
        <url>https://repo.spring.io/libs-snapshot-local</url>
        <snapshots>
            <enabled>true</enabled>
        </snapshots>
    </repository>
    <repository>
        <id>spring-milestones</id>
        <name>Spring Milestones</name>
        <url>https://repo.spring.io/libs-milestone-local</url>
        <snapshots>
            <enabled>false</enabled>
        </snapshots>
    </repository>
    <repository>
        <id>spring-releases</id>
        <name>Spring Releases</name>
        <url>https://repo.spring.io/libs-release-local</url>
        <snapshots>
            <enabled>false</enabled>
        </snapshots>
    </repository>
</repositories>

2. 功能實現

在 Http1.1 支持了長連接,請求頭添加一個Connection: keep-alive即可

在這里我們借助異步請求來實現 sse 功能,至於什么是異步請求,推薦查看博文: 【WEB 系列】異步請求知識點與使用姿勢小結

因為后端可以不定時返回數據,所以我們需要注意的就是需要保持連接,不要返回一次數據之后就斷開了;其次就是需要設置請求頭Content-Type: text/event-stream;charset=UTF-8 (如果不是流的話會怎樣?)

// 新建一個容器,保存連接,用於輸出返回
private Map<String, PrintWriter> responseMap = new ConcurrentHashMap<>();

// 發送數據給客戶端
private void writeData(String id, String msg, boolean over) throws IOException {
    PrintWriter writer = responseMap.get(id);
    if (writer == null) {
        return;
    }

    writer.println(msg);
    writer.flush();
    if (over) {
        responseMap.remove(id);
    }
}

// 推送
@ResponseBody
@GetMapping(path = "subscribe")
public WebAsyncTask<Void> subscribe(String id, HttpServletResponse response) {

    Callable<Void> callable = () -> {
        response.setHeader("Content-Type", "text/event-stream;charset=UTF-8");
        responseMap.put(id, response.getWriter());
        writeData(id, "訂閱成功", false);
        while (true) {
            Thread.sleep(1000);
            if (!responseMap.containsKey(id)) {
                break;
            }
        }
        return null;
    };

    // 采用WebAsyncTask 返回 這樣可以處理超時和錯誤 同時也可以指定使用的Excutor名稱
    WebAsyncTask<Void> webAsyncTask = new WebAsyncTask<>(30000, callable);
    // 注意:onCompletion表示完成,不管你是否超時、是否拋出異常,這個函數都會執行的
    webAsyncTask.onCompletion(() -> System.out.println("程序[正常執行]完成的回調"));

    // 這兩個返回的內容,最終都會放進response里面去===========
    webAsyncTask.onTimeout(() -> {
        responseMap.remove(id);
        System.out.println("超時了!!!");
        return null;
    });
    // 備注:這個是Spring5新增的
    webAsyncTask.onError(() -> {
        System.out.println("出現異常!!!");
        return null;
    });


    return webAsyncTask;
}

看一下上面的實現,基本上還是異步請求的那一套邏輯,請仔細看一下callable中的邏輯,有一個 while 循環,來保證長連接不中斷

接下來我們新增兩個接口,用來模擬后端給客戶端發送消息,關閉連接的場景

@ResponseBody
@GetMapping(path = "push")
public String pushData(String id, String content) throws IOException {
    writeData(id, content, false);
    return "over!";
}

@ResponseBody
@GetMapping(path = "over")
public String over(String id) throws IOException {
    writeData(id, "over", true);
    return "over!";
}

我們簡單的來演示下操作過程

III. SseEmitter

上面只是簡單實現了 sse 的長連接 + 后端推送消息,但是與標准的 SSE 還是有區別的,sse 有自己的規范,而我們上面的實現,實際上並沒有管這個,導致的問題是前端按照 sse 的玩法來請求數據,可能並不能正常工作

1. sse 規范

在 html5 的定義中,服務端 sse,一般需要遵循以下要求

請求頭

開啟長連接 + 流方式傳遞

Content-Type: text/event-stream;charset=UTF-8
Cache-Control: no-cache
Connection: keep-alive

數據格式

服務端發送的消息,由 message 組成,其格式如下:

field:value\n\n

其中 field 有五種可能

  • 空: 即以:開頭,表示注釋,可以理解為服務端向客戶端發送的心跳,確保連接不中斷
  • data:數據
  • event: 事件,默認值
  • id: 數據標識符用 id 字段表示,相當於每一條數據的編號
  • retry: 重連時間

2. 實現

SpringBoot 利用 SseEmitter 來支持 sse,可以說非常簡單了,直接返回SseEmitter對象即可;重寫一下上面的邏輯

@RestController
@RequestMapping(path = "sse")
public class SseRest {
    private static Map<String, SseEmitter> sseCache = new ConcurrentHashMap<>();
    @GetMapping(path = "subscribe")
    public SseEmitter push(String id) {
        // 超時時間設置為1小時
        SseEmitter sseEmitter = new SseEmitter(3600_000L);
        sseCache.put(id, sseEmitter);
        sseEmitter.onTimeout(() -> sseCache.remove(id));
        sseEmitter.onCompletion(() -> System.out.println("完成!!!"));
        return sseEmitter;
    }

    @GetMapping(path = "push")
    public String push(String id, String content) throws IOException {
        SseEmitter sseEmitter = sseCache.get(id);
        if (sseEmitter != null) {
            sseEmitter.send(content);
        }
        return "over";
    }

    @GetMapping(path = "over")
    public String over(String id) {
        SseEmitter sseEmitter = sseCache.get(id);
        if (sseEmitter != null) {
            sseEmitter.complete();
            sseCache.remove(id);
        }
        return "over";
    }
}

上面的實現,用到了 SseEmitter 的幾個方法,解釋如下

  • send(): 發送數據,如果傳入的是一個非SseEventBuilder對象,那么傳遞參數會被封裝到 data 中
  • complete(): 表示執行完畢,會斷開連接
  • onTimeout(): 超時回調觸發
  • onCompletion(): 結束之后的回調觸發

同樣演示一下訪問請求

上圖總的效果和前面的效果差不多,而且輸出還待上了前綴,接下來我們寫一個簡單的 html 消費端,用來演示一下完整的 sse 的更多特性

<!doctype html>
<html lang="en">
<head>
    <title>Sse測試文檔</title>
</head>
<body>
<div>sse測試</div>
<div id="result"></div>
</body>
</html>
<script>
    var source = new EventSource('http://localhost:8080/sse/subscribe?id=yihuihui');
    source.onmessage = function (event) {
        text = document.getElementById('result').innerText;
        text += '\n' + event.data;
        document.getElementById('result').innerText = text;
    };
    <!-- 添加一個開啟回調 -->
    source.onopen = function (event) {
        text = document.getElementById('result').innerText;
        text += '\n 開啟: ';
        console.log(event);
        document.getElementById('result').innerText = text;
    };
</script>

將上面的 html 文件放在項目的resources/static目錄下;然后修改一下前面的SseRest

@Controller
@RequestMapping(path = "sse")
public class SseRest {
    @GetMapping(path = "")
    public String index() {
        return "index.html";
    }

    @ResponseBody
    @GetMapping(path = "subscribe", produces = {MediaType.TEXT_EVENT_STREAM_VALUE})
    public SseEmitter push(String id) {
        // 超時時間設置為3s,用於演示客戶端自動重連
        SseEmitter sseEmitter = new SseEmitter(1_000L);
        // 設置前端的重試時間為1s
        sseEmitter.send(SseEmitter.event().reconnectTime(1000).data("連接成功"));
        sseCache.put(id, sseEmitter);
        System.out.println("add " + id);
        sseEmitter.onTimeout(() -> {
            System.out.println(id + "超時");
            sseCache.remove(id);
        });
        sseEmitter.onCompletion(() -> System.out.println("完成!!!"));
        return sseEmitter;
    }
}

我們上面超時時間設置的比較短,用來測試下客戶端的自動重連,如下,開啟的日志不斷增加

其次將 SseEmitter 的超時時間設長一點,再試一下數據推送功能

請注意上面的演示,當后端結束了長連接之后,客戶端會自動重新再次連接,不用寫額外的重試邏輯了,就這么神奇

3. 小結

本篇文章介紹了 SSE 的相關知識點,並對比 websocket 給出了 sse 的優點(至於啥優點請往上翻)

請注意,本文雖然介紹了兩種 sse 的方式,第一種借助異步請求來實現,如果需要完成 sse 的規范要求,需要自己做一些適配,如果需要了解 sse 底層實現原理的話,可以參考一下;在實際的業務開發中,推薦使用SseEmitter

IV. 其他

0. 項目

系列博文

源碼

1. 一灰灰 Blog

盡信書則不如,以上內容,純屬一家之言,因個人能力有限,難免有疏漏和錯誤之處,如發現 bug 或者有更好的建議,歡迎批評指正,不吝感激

下面一灰灰的個人博客,記錄所有學習和工作中的博文,歡迎大家前去逛逛

一灰灰blog


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM