概述
SSE(Server-Sent Events)是一种Web技术,它允许服务器实时向客户端推送数据。相比于传统的轮询和长轮询机制,SSE提供了一种更高效且实时的数据推送方式。这种技术主要应用于构建实时应用,例如实时消息推送、股票行情更新等。
SSE是HTML5规范中的一个通信相关API,它主要包含两个部分:服务端与浏览器端的通信协议(基于HTTP协议),以及浏览器端JavaScript可使用的EventSource对象。
SSE运行在HTTP协议之上,它允许服务器以事件流(Event Stream)的形式将数据发送给客户端。客户端通过建立持久化的HTTP连接,并监听这个事件流,从而可以实时接收到服务器推送的数据。
Server-Send Events Api |
WebSockets Api |
基于 HTTP 协议 |
基于 TCP 协议 |
单工,只能服务端单向发送消息 |
全双工,可以同时发送和接收消息 |
轻量级,使用简单 |
相对复杂 |
内置断线重连和消息追踪的功能 |
不在协议范围内,需手动实现 |
文本或使用 Base64 编码和 gzip 压缩的二进制消息 |
类型广泛 |
支持自定义事件类型 |
不支持自定义事件类型 |
连接数 HTTP/1.1 6 个,HTTP/2 可协商(默认 100) |
连接数无限制 |
特点
简单易用:SSE使用基于文本的数据格式,如纯文本、JSON等,这使得数据发送和解析都相对简单直接。
单向通信:SSE仅支持从服务器到客户端的单向通信。这意味着服务器可以主动推送数据给客户端,但客户端只能被动接收数据。
实时性:由于SSE能够建立持久化连接,服务器因此可以实时地将数据推送给客户端,无需客户端频繁地发起请求。这大大提高了数据传输的效率和实时性。
实现原理
连接建立:通常情况下,客户端(如浏览器)通过发送HTTP GET请求到服务器来请求建立一个SSE连接。
服务器响应:一旦服务器接收到请求,它将返回一个HTTP响应,该响应的状态码为200,内容类型(Content-Type)设置为”text/event-stream”。
数据推送:服务器可以通过已经建立的连接向客户端推送数据。每次推送的数据被称作一个事件(Event)。每个事件由一个或多个以”\n\n”分隔的数据块组成。每个数据块都是一行文本,可能包含一个以”:”开头的注释行、以”data:”开头的数据行,或者以”id:”和”event:”开头的行来指定事件ID和事件类型。
客户端处理:当客户端接收到服务器推送的事件后,它会触发相应的JavaScript事件处理器来处理这些事件。
重连:如果连接断开,客户端会自动尝试重新连接。如果服务器在事件中指定了ID,那么在重新连接时,客户端会发送一个”Last-Event-ID”的HTTP头部信息到服务器,告诉服务器客户端接收到的最后一个事件的ID。根据这个信息,服务器可以决定从哪个事件开始重新发送数据。
总结起来,SSE使用了基于文本和HTTP协议的简单机制,使得服务器能够实时地将数据推送到客户端,而无需客户端频繁地发起新的请求。
注意事项
以下是在使用SSE(Server-Sent Events)技术进行实时数据推送时需要注意的几个关键点:
异步处理:由于SSE基于长连接的机制,因此数据推送过程可能会持续较长时间。为了防止服务器线程被阻塞,建议采用异步方式处理SSE请求。例如,可以在控制器方法中使用@Async注解或利用CompletableFuture等异步编程方式。
超时处理:SSE连接可能会因网络中断、客户端关闭等原因而超时。为了避免无效连接占据服务器资源,建议设置超时时间并处理超时情况。例如,可以利用SseEmitter对象的setTimeout()方法设定超时时间,并通过onTimeout()方法处理超时逻辑。
异常处理:在实际应用中,可能会遇到网络异常、数据推送失败等问题。这种情况下,可以使用SseEmitter对象的completeWithError()方法将异常信息发送给客户端,并在客户端通过eventSource.onerror事件进行处理。
内存管理:在使用SseEmitter时,需要特别注意内存管理问题,尤其是在大量并发连接的场景下。当客户端断开连接后,务必及时释放SseEmitter对象,以避免资源泄漏和内存溢出。
并发性能:SSE的并发连接数可能对服务器性能产生影响。如果需要处理大量并发连接,可以考虑使用线程池或其他异步处理方式,以最大化服务器资源利用。
客户端兼容性:虽然大多数现代浏览器都支持SSE,但一些旧版本的浏览器可能不支持。因此,在使用SSE时,需要确保目标客户端对其有良好的支持,或者提供备选的实时数据推送机制。
以上这些注意事项可以根据具体应用需求进行调整和优化。在实际应用中,确保服务器的稳定性、安全性和性能是非常重要的。同时,在处理SSE连接时,可以考虑适当的限流和安全控制措施,以防止滥用和恶意连接的出现。总的来说,使用SSE技术时需要全面考虑各个方面的因素,才能实现高效、稳定、安全的实时数据推送服务。
ChatGPT 为什么选择 SSE,而非 WebSocket
要理解这个选择,我们需要关注ChatGPT的使用场景。作为一个基于深度学习的大型语言模型,ChatGPT需要处理大量的自然语言数据,这无疑需要大量的计算资源和时间。相较于普通的读取数据库操作,其响应速度自然会慢许多。
对于这种可能需要长时间等待响应的对话场景,ChatGPT采用了一种巧妙的策略:它会将已经计算出的数据“推送”给用户,并利用SSE技术在计算过程中持续返回数据。这样做可以避免用户因等待时间过长而选择关闭页面。
服务端实现
协议
SSE 协议非常简单,本质是浏览器发起 http 请求,服务器在收到请求后,返回状态与数据,并附带以下 headers:
1 2 3
| Content-Type: text/event-stream Cache-Control: no-cache Connection: keep-alive
|
- SSE API规定推送事件流的 MIME 类型为 text/event-stream。
- 必须指定浏览器不缓存服务端发送的数据,以确保浏览器可以实时显示服务端发送的数据。
- SSE 是一个一直保持开启的 TCP 连接,所以 Connection 为 keep-alive。
消息格式
EventStream(事件流)为 UTF-8 格式编码的文本或使用 Base64 编码和 gzip 压缩的二进制消息。
每条消息由一行或多行字段(event、id、retry、data)组成,每个字段组成形式为:字段名:字段值。字段以行为单位,每行一个(即以 \n 结尾)。以冒号开头的行为注释行,会被浏览器忽略。
每次推送,可由多个消息组成,每个消息之间以空行分隔(即最后一个字段以\n\n结尾)。
📢注意:
- 除上述四个字段外,其他所有字段都会被忽略。
- 如果一行字段中不包含冒号,则整行文本将被视为字段名,字段值为空。
- 注释行可以用来防止链接超时,服务端可以定期向浏览器发送一条消息注释行,以保持连接不断。
event
事件类型。如果指定了该字段,则在浏览器收到该条消息时,会在当前 EventSource 对象(见 4)上触发一个事件,事件类型就是该字段的字段值。可以使用 addEventListener 方法在当前 EventSource 对象上监听任意类型的命名事件。
如果该条消息没有 event 字段,则会触发 EventSource 对象 onmessage 属性上的事件处理函数。
id
事件ID。事件的唯一标识符,浏览器会跟踪事件ID,如果发生断连,浏览器会把收到的最后一个事件ID放到 HTTP Header Last-Event-Id 中进行重连,作为一种简单的同步机制。
例如可以在服务端将每次发送的事件ID值自动加 1,当浏览器接收到该事件ID后,下次与服务端建立连接后再请求的 Header 中将同时提交该事件ID,服务端检查该事件ID是否为上次发送的事件ID,如果与上次发送的事件ID不一致则说明浏览器存在与服务器连接失败的情况,本次需要同时发送前几次浏览器未接收到的数据。
retry
重连时间。整数值,单位 ms,如果与服务器的连接丢失,浏览器将等待指定时间,然后尝试重新连接。如果该字段不是整数值,会被忽略。
当服务端没有指定浏览器的重连时间时,由浏览器自行决定每隔多久与服务端建立一次连接(一般为 30s)。
data
如下事件流示例,共发送了 4 条消息,每条消息间以一个空行作为分隔符。
第一条仅仅是个注释,因为它以冒号开头。
第二条消息只包含一个 data 字段,值为 ‘this is second message’。
第三条消息包含两个 data 字段,其会被解析为一个字段,值为 ‘this is third message part 1\nthis is third message part 2’。
第四条消息包含完整四个字段,指定了事件类型为 ‘server-time’,事件id 为 ‘1’,重连时间为 ‘30000’ms,消息数据为 JSON 格式的 ‘{“text”: “this is fourth message”, “time”: “12:00:00”}’。
1 2 3 4 5 6 7 8 9 10 11 12
| : this is first message\n\n
data: this is second message\n\n
data: this is third message part one\n data this is third message part two\n\n
event: server-time\n id: 1 retry: 30000\n data: {"text": "this is fourth message", "time": "2023-04-09 12:00:00"}\n\n
|
浏览器 API
在浏览器端,可以使用 JavaScript 的 EventSource API 创建 EventSource 对象监听服务器发送的事件。一旦建立连接,服务器就可以使用 HTTP 响应的 ‘text/event-stream’ 内容类型发送事件消息,浏览器则可以通过监听 EventSource 对象的 onmessage、onopen 和 onerror 事件来处理这些消息。
建立连接
EventSource 接受两个参数:URL 和 options。
URL 为 http 事件来源,一旦 EventSource 对象被创建后,浏览器立即开始对该 URL 地址发送过来的事件进行监听。
options 是一个可选的对象,包含 withCredentials 属性,表示是否发送凭证(cookie、HTTP认证信息等)到服务端,默认为 false。
1
| const eventSource = new EventSource('http_api_url', { withCredentials: true })
|
与 XMLHttpRequest 对象类型,EventSource 对象有一个 readyState 属性值,具体含义如下表:
readState |
含义 |
0 |
浏览器与服务端尚未建立连接或连接已被关闭 |
1 |
浏览器与服务端已成功连接,浏览器正在处理接收到的事件及数据 |
2 |
浏览器与服务端建立连接失败,客户端不再继续建立与服务端之间的连接 |
可以使用 EventSource 对象的 close 方法关闭与服务端之间的连接,使浏览器不再建立与服务端之间的连接。
监听事件
EventSource 对象本身继承自 EventTarget 接口,因此可以使用 addEventListener() 方法来监听事件。EventSource 对象触发的事件主要包括以下三种:
- open 事件:当成功连接到服务端时触发。
- message 事件:当接收到服务器发送的消息时触发。该事件对象的 data 属性包含了服务器发送的消息内容。
- error 事件:当发生错误时触发。该事件对象的 event 属性包含了错误信息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
|
eventSource.addEventListener('open', function(event) { console.log('Connection opened') })
eventSource.addEventListener('message', function(event) { console.log('Received message: ' + event.data); })
eventSource.addEventListener('xxx', function(event) { console.log('Received message: ' + event.data); })
eventSource.addEventListener('error', function(event) { console.log('Error occurred: ' + event.event); })
|
当然,也可以采用属性监听(onopen、onmessage、onerror)的形式。
1 2 3 4 5 6 7 8 9 10 11 12 13
| / 初始化 eventSource 等省略
eventSource.onopen = function(event) { console.log('Connection opened') }
eventSource.onmessage = function(event) { console.log('Received message: ' + event.data); }
eventSource.onerror = function(event) { console.log('Error occurred: ' + event.event); })
|
📢注意:
EventSource 对象的属性监听只能监听预定义的事件类型(open、message、error)。不能用于监听自定义事件类型。如果要实现自定义事件类型的监听,可以使用 addEventListener() 方法。
实践
服务端
使用 Node.js 实现 SSE 的简单示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
| const http = require('http') const fs = require('fs')
http.createServer((req, res) => { const url = req.url if (url === '/' || url === 'index.html') { fs.readFile('index.html', (err, data) => { if (err) { res.writeHead(500) res.end('Error loading') } else { res.writeHead(200, {'Content-Type': 'text/html'}) res.end(data) } }) } else if (url.includes('/sse')) { res.writeHead(200, { 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache', 'Connection': 'keep-alive', 'Access-Control-Allow-Origin': '*', })
let id = 0 const intervalId = setInterval(() => { res.write(`event: customEvent\n`) res.write(`id: ${id}\n`) res.write(`retry: 30000\n`) const params = url.split('?')[1] const data = { id, time: new Date().toISOString(), params } res.write(`data: ${JSON.stringify(data)}\n\n`) id++ if (id >= 10) { clearInterval(intervalId) res.end() } }, 1000)
req.on('close', () => { clearInterval(intervalId) id = 0 res.end() }) } else { res.writeHead(404) res.end() } }).listen(3000)
console.log('Server listening on port 3000')
|
客户端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| <!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <meta http-equiv="X-UA-Compatible" content="IE=edge"> <meta name="viewport" content="width=device-width, initial-scale=1.0"> <title>SSE Demo</title> </head> <body> <h1>SSE Demo</h1> <button onclick="connectSSE()">建立 SSE 连接</button> <button onclick="closeSSE()">断开 SSE 连接</button> <br /> <br /> <div id="message"></div>
<script> const messageElement = document.getElementById('message')
let eventSource
const connectSSE = () => { eventSource = new EventSource('http://127.0.0.1:3000/sse?content=xxx')
eventSource.addEventListener('customEvent', (event) => { const data = JSON.parse(event.data) messageElement.innerHTML += `${data.id} --- ${data.time} --- params参数:${JSON.stringify(data.params)}` + '<br />' })
eventSource.onopen = () => { messageElement.innerHTML += `SSE 连接成功,状态${eventSource.readyState}<br />` }
eventSource.onerror = () => { messageElement.innerHTML += `SSE 连接错误,状态${eventSource.readyState}<br />` } }
const closeSSE = () => { eventSource.close() messageElement.innerHTML += `SSE 连接关闭,状态${eventSource.readyState}<br />` } </script> </body> </html>
|
将上面的两份代码保存为 server.go 和 index.html,并在命令行中执行 node server.js 启动服务端,然后在浏览器中打开 http://localhost:3000 即可看到 SSE 效果。
兼容性
发展至今,SSE 已具有广泛的的浏览器兼容性,几乎除 IE 之外的浏览器均已支持。
对于不支持 EventSource 的浏览器,可以使用 polyfill 实现。判断浏览器是否支持 EventSource:
1 2 3 4 5
| if(typeof(EventSource) !== “undefined”) { } else { }
|
Fetch 的实现
虽然使用 SSE 技术可以实现 ChatGPT 一样的打字机效果,但是通过上文请求 type 对比可以发现,在使用 SSE 时,type 为 eventSource,而 ChatGPT 为 fetch。且受浏览器 EventSource API 限制,在使用 SSE 时不能自定义请求头、只能发出 GET 请求,且在大多数浏览器中,URL 限制 2000个字符,也无法满足 ChatGPT 参数传递需求。
此时,可以使用 Fetch API 实现一个替代接口,用于模拟 SSE 实现。简单实现如下:
服务端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
| const http = require('http') const fs = require('fs')
http.createServer((req, res) => { const url = req.url if (url === '/' || url === 'index-fetch.html') { fs.readFile('index-fetch.html', (err, data) => { if (err) { res.writeHead(500) res.end('Error loading') } else { res.writeHead(200, {'Content-Type': 'text/html'}) res.end(data) } }) } else if (url.includes('/fetch-sse')) { res.writeHead(200, { 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache', 'Connection': 'keep-alive', 'Access-Control-Allow-Origin': '*', }) let body = '' req.on('data', chunk => { body += chunk }) let id = 0 const intervalId = setInterval(() => { const data = { id, time: new Date().toISOString(), body: JSON.parse(body) } res.write(JSON.stringify(data)) id++ if (id >= 10) { clearInterval(intervalId) res.end() } }, 1000)
req.on('close', () => { clearInterval(intervalId) id = 0 res.end() }) } else { res.writeHead(404) res.end() } }).listen(3001)
console.log('Server listening on port 3001')
|
客户端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
| <!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <meta http-equiv="X-UA-Compatible" content="IE=edge"> <meta name="viewport" content="width=device-width, initial-scale=1.0"> <title>fetchSSE Demo</title> </head> <body> <h1>fetchSSE Demo</h1> <button onclick="connectFetch()">建立 fetchSSE 连接</button> <button onclick="closeSSE()">断开 fetchSSE 连接</button> <br /> <br /> <div id="message"></div>
<script> const messageElement = document.getElementById('message') let controller
const connectFetch = () => { controller = new AbortController() fetchEventSource('http://127.0.0.1:3001/fetch-sse', { method: 'POST', body: JSON.stringify({ content: 'xxx' }), signal: controller.signal, onopen: () => { messageElement.innerHTML += `FETCH 连接成功<br />` }, onclose: () => { messageElement.innerHTML += `FETCH 连接关闭<br />` }, onmessage: (event) => { const data = JSON.parse(event) messageElement.innerHTML += `${data.id} --- ${data.time} --- body参数:${JSON.stringify(data.body)}` + '<br />' }, onerror: (e) => { console.log(e) } }) }
const closeSSE = () => { if (controller) { controller.abort() controller = undefined messageElement.innerHTML += `FETCH 连接关闭<br />` } }
const fetchEventSource = (url, options) => { fetch(url, options) .then(response => { if (response.status === 200) { options.onopen && options.onopen() return response.body } }) .then(rb => { const reader = rb.getReader() const push = () => { return reader.read().then(({done, value}) => { if (done) { options.onclose && options.onclose() return } options.onmessage && options.onmessage(new TextDecoder().decode(value)) return push() }) } return push() }) .catch((e) => { options.error && options.error(e) }) } </script>
</html>
|
💡不同于 XMLHttpRequest,fetch 并未原生提供终止操作方法,可以通过 DOM API AbortController 和 AbortSignal 实现 fetch 请求终止操作。
将上面的两份代码保存为 server-fetch.js 和 index-fetch.html,并在命令行中执行 node server-fetch.js 启动服务端,然后在浏览器中打开 http://localhost:3001 即可看到 fetch 版 SSE 效果。
总结
SSE 技术是一种轻量级的实时通信技术,基于 HTTP 协议,具有服务端推送、断线重连、简单轻量等优点。但是,SSE 技术也有一些缺点,如不能进行双向通信、连接数受限、仅支持 get 请求等。
SSE 可以在 Web 应用程序中实现诸如股票在线数据、日志推送、聊天室实时人数等即时数据推送功能。需要注意的是,SSE 并不是适用于所有的实时推送场景。在需要高并发、高吞吐量和低延迟的场景下,WebSockets 可能更加适合。而在需要更轻量级的推送场景下,SSE 可能更加适合。因此,在选择即时更新方案时,需要根据具体的需求和场景进行选择。