0赞
赞赏
更多好文
从0到1:Spring Boot 中WebSocket实战揭秘,开启实时通信新时代
引言:实时通信的需求与挑战
在当今数字化时代,互联网应用的实时交互需求日益增长。从在线聊天、股票行情实时更新,到多人协作办公、在线游戏等场景,实时通信已成为提升用户体验和业务效率的关键因素。传统的 HTTP 协议基于请求 - 响应模式,客户端发起请求,服务器被动响应,这种模式在实时通信场景中存在诸多局限性,如高延迟、高开销以及单向性(服务器无法主动推送数据,需客户端轮询) 。为了满足实时通信的需求,WebSocket 技术应运而生,它以独特的优势为现代互联网应用提供了高效、低延迟的实时通信解决方案,成为了众多开发者构建实时应用的首选技术之一。
WebSocket 技术基础
(一)什么是 WebSocket
WebSocket 是一种基于 TCP 的全双工通信协议,于 2011 年被 IETF 标准化为 RFC 6455 ,并由 W3C 制定了相应的 API 标准。与传统 HTTP 的单向请求 - 响应模式不同,WebSocket 允许客户端和服务器在建立一次连接后,就可以相互主动发送和接收数据,实现真正意义上的双向实时通信。举个例子,在在线聊天场景中,使用 HTTP 协议时,客户端需要不断发送请求获取新消息,而 WebSocket 则能让服务器在有新消息时直接推送给客户端,无需客户端频繁请求,大大提高了通信效率和实时性 。
(二)WebSocket 与 HTTP 的区别
-
通信方式:HTTP 是单向通信,客户端发起请求,服务器响应,服务器无法主动向客户端推送数据;WebSocket 是全双工通信,连接建立后,客户端和服务器可随时双向发送数据。
-
连接状态:HTTP 是无状态协议,每次请求都是独立的,服务器不保存客户端状态信息;WebSocket 是有状态协议,连接建立后会维持状态,服务器能识别客户端身份和状态。
-
连接建立:HTTP 基于请求 - 响应模式,一次请求 - 响应后连接通常关闭(除非使用 Keep - Alive);WebSocket 通过 HTTP 协议进行握手,握手成功后建立持久连接,后续基于此连接进行双向通信。
-
适用场景:HTTP 适用于获取静态资源、普通网页浏览等客户端主动获取数据的场景;WebSocket 适用于实时聊天、在线游戏、股票行情实时推送、实时监控等需要实时双向通信的场景。
(三)WebSocket 工作原理
-
握手阶段:客户端向服务器发送一个特殊的 HTTP 请求,请求头包含
Upgrade: websocket和Connection: Upgrade,表示希望将连接升级为 WebSocket 协议,同时还包含一个经过 Base64 编码的随机字符串Sec - WebSocket - Key用于安全验证。服务器接收到请求后,若支持 WebSocket 协议,会对Sec - WebSocket - Key进行处理(将其与固定字符串258EAFA5 - E914 - 47DA - 95CA - C5AB0DC85B11拼接,进行 SHA - 1 哈希计算,再进行 Base64 编码),生成Sec - WebSocket - Accept,并返回HTTP 101 Switching Protocols响应,包含Upgrade: websocket、Connection: Upgrade以及Sec - WebSocket - Accept等头部信息,完成握手,连接升级为 WebSocket 连接。 -
数据传输阶段:握手成功后,客户端和服务器通过该 TCP 连接进行双向数据传输。WebSocket 的数据传输以帧为单位,数据帧包含
FIN(表示是否是消息的最后一个片段)、RSV1、RSV2、RSV3(通常为 0 ,用于自定义扩展)、Opcode(表示数据类型,如文本、二进制、关闭连接、ping、pong 等)、Mask(客户端发送数据时必须设置为 1 ,用于掩码处理)、Payload length(数据负载长度)、Masking - key(掩码密钥,当Mask为 1 时存在)和Payload data(实际传输的数据)等字段。通过这种数据帧格式,双方可以高效地进行实时数据交互 。
Spring Boot 集成 WebSocket 实战
了解了 WebSocket 的基础知识后,接下来我们通过一个具体的 Spring Boot 项目实战,深入学习如何在 Spring Boot 中集成和使用 WebSocket ,实现一个简单的实时消息推送功能。
(一)创建 Spring Boot 项目
首先,我们使用 Spring Initializr 来快速创建一个 Spring Boot 项目。打开浏览器,访问https://start.spring.io/ ,在页面中进行如下配置:
-
项目基本信息:选择 Maven 项目,语言为 Java ,Spring Boot 版本根据需求选择(这里以最新稳定版本为例),填写 Group 和 Artifact 等基本信息。
-
依赖选择:在依赖搜索框中,依次添加 “Spring Web” 和 “Spring WebSocket” 依赖,这两个依赖分别用于支持 Web 开发和 WebSocket 功能。点击 “Generate” 按钮,下载生成的项目压缩包,解压后用 IDE(如 IntelliJ IDEA、Eclipse 等)打开。
(二)配置 WebSocket
在项目中创建一个配置类,用于启用 WebSocket 支持并注册 WebSocket 端点。在src/main/java/your - package - name下创建WebSocketConfig.java文件,代码如下:
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
// 注册一个WebSocket处理器,映射到/ws路径,允许所有来源的跨域请求
registry.addHandler(myWebSocketHandler(), "/ws").setAllowedOrigins("*");
}
// 定义一个WebSocket处理器的Bean
@Bean
public MyWebSocketHandler myWebSocketHandler() {
return new MyWebSocketHandler();
}
}
在上述代码中,@Configuration注解表明这是一个配置类,@EnableWebSocket注解启用了 WebSocket 功能。registerWebSocketHandlers方法中,我们通过registry.addHandler注册了一个自定义的 WebSocket 处理器MyWebSocketHandler,并将其映射到/ws端点,同时通过setAllowedOrigins("*")允许所有来源的跨域请求(在生产环境中应根据实际情况限制跨域来源) 。
(三)编写 WebSocket 处理器
创建一个自定义的 WebSocket 处理器类,用于处理 WebSocket 连接的打开、消息接收、连接关闭等事件。在src/main/java/your - package - name下创建MyWebSocketHandler.java文件,代码如下:
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;
import java.util.concurrent.CopyOnWriteArraySet;
public class MyWebSocketHandler extends TextWebSocketHandler {
// 使用线程安全的集合来存储WebSocket会话
private static final CopyOnWriteArraySet<WebSocketSession> sessions = new CopyOnWriteArraySet<>();
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
sessions.add(session);
System.out.println("新连接建立: " + session.getId());
session.sendMessage(new TextMessage("欢迎连接到WebSocket服务器,当前在线人数: " + sessions.size()));
}
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
String payload = message.getPayload();
System.out.println("收到消息: " + payload);
// 广播消息给所有连接的客户端
for (WebSocketSession webSocketSession : sessions) {
if (webSocketSession.isOpen()) {
webSocketSession.sendMessage(new TextMessage("来自 " + session.getId() + " 的消息: " + payload));
}
}
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
sessions.remove(session);
System.out.println("连接关闭: " + session.getId());
if (sessions.size() > 0) {
for (WebSocketSession webSocketSession : sessions) {
if (webSocketSession.isOpen()) {
webSocketSession.sendMessage(new TextMessage("用户 " + session.getId() + " 已离开,当前在线人数: " + sessions.size()));
}
}
}
}
}
在这个处理器类中:
-
afterConnectionEstablished方法在 WebSocket 连接建立后被调用,将新的会话添加到sessions集合中,并向客户端发送欢迎消息,包含当前在线人数。 -
handleTextMessage方法在接收到客户端发送的文本消息时被调用,获取消息内容并广播给所有在线的客户端,消息中包含发送者的会话 ID 。 -
afterConnectionClosed方法在 WebSocket 连接关闭时被调用,将会话从sessions集合中移除,并向其他在线客户端发送通知消息,告知有用户离开以及当前在线人数。
(四)前端页面示例
为了测试 WebSocket 功能,我们创建一个简单的 HTML 页面,通过 JavaScript 连接到后端的 WebSocket 服务。在src/main/resources/static目录下创建index.html文件,代码如下:
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<title>WebSocket测试</title>
</head>
<body>
<h2>WebSocket实时通信测试</h2>
<input type="text" id="message" placeholder="输入消息">
<button onclick="sendMessage()">发送</button>
<div id="response"></div>
<script>
const socket = new WebSocket('ws://localhost:8080/ws');
socket.onopen = function () {
console.log('已连接到WebSocket服务器');
};
socket.onmessage = function (event) {
document.getElementById('response').innerHTML += event.data + '<br>';
};
socket.onclose = function () {
console.log('WebSocket连接已关闭');
};
function sendMessage() {
const message = document.getElementById('message').value;
socket.send(message);
document.getElementById('message').value = '';
}
</script>
</body>
</html>
在这个 HTML 页面中:
-
通过
new WebSocket('ws://localhost:8080/ws')创建一个 WebSocket 连接,指向后端的/ws端点。 -
onopen事件在连接建立时触发,在控制台打印连接成功信息。 -
onmessage事件在接收到服务器发送的消息时触发,将消息显示在页面的response区域。 -
onclose事件在连接关闭时触发,在控制台打印连接关闭信息。 -
sendMessage函数在用户点击 “发送” 按钮时被调用,获取输入框中的消息并通过 WebSocket 发送给服务器。
启动 Spring Boot 应用,打开浏览器访问http://localhost:8080/index.html,在页面中输入消息并点击发送,即可看到消息实时推送到页面,同时其他连接到该 WebSocket 服务的客户端也能收到广播消息,实现了简单的实时通信功能。
应用场景与案例分析
(一)常见应用场景
-
即时通讯:如微信、QQ 等社交软件的网页版或在线客服系统,WebSocket 能够实现消息的即时收发,提升沟通效率和用户体验,让用户感觉就像面对面交流一样,消息几乎无延迟 。
-
在线游戏:在多人在线游戏中,玩家的操作(如移动、攻击、释放技能等)需要实时同步给其他玩家。WebSocket 的低延迟和双向通信特性确保了游戏状态的及时更新,保证游戏的流畅性和公平性,避免因通信延迟导致的游戏体验不佳 。
-
实时监控:在工业监控、服务器状态监控、智能安防监控等场景中,WebSocket 可将监控数据实时推送给监控中心或管理人员。例如,工厂中的设备运行数据(温度、压力、转速等)可以通过 WebSocket 实时传输到监控大屏,一旦设备出现异常,能够立即发出警报,便于及时采取措施 。
-
股票行情与金融交易:股票交易平台、外汇交易平台等金融应用中,市场行情(股票价格、汇率等)瞬息万变。WebSocket 能实时推送最新的价格数据和交易信息,让投资者及时了解市场动态,做出准确的投资决策,抓住瞬息即逝的投资机会 。
-
协同办公:多人在线协作编辑文档、表格、思维导图等场景下,WebSocket 可以实时同步用户的操作,如文字输入、格式调整、图形绘制等,使团队成员能够实时看到彼此的修改,就像在同一时间、同一地点办公一样,提高协作效率 。
(二)案例展示
以某电商平台的实时订单提醒功能为例,该电商平台每天会产生大量订单,商家需要及时了解新订单的情况,以便快速处理订单,提高客户满意度。
-
需求分析:商家在登录电商平台的管理后台后,希望能够实时收到新订单的提醒消息,包括订单编号、下单时间、商品信息、客户信息等,无需手动刷新页面查询订单。
-
技术选型:后端采用 Spring Boot 作为开发框架,利用其强大的生态和便捷的开发特性;通信协议选择 WebSocket,以实现服务器主动向商家客户端推送订单消息。
-
实现方案
- 后端实现:在 Spring Boot 项目中,按照前文所述的集成 WebSocket 的步骤,配置 WebSocket 并编写处理器。当有新订单生成时,订单服务会将订单信息发送给 WebSocket 处理器,处理器将订单消息广播给所有连接的商家客户端。例如,在订单创建的业务逻辑中添加如下代码:
@Service
public class OrderService {
@Autowired
private WebSocketServer webSocketServer;
public void createOrder(Order order) {
// 保存订单到数据库等操作
//...
// 发送订单提醒消息
webSocketServer.sendMessageToAll("新订单提醒:订单编号 " + order.getOrderId() + ",下单时间 " + order.getCreateTime() + ",商品:" + order.getProductList());
}
}
- 前端实现:商家管理后台的前端页面通过 JavaScript 创建 WebSocket 连接到后端的 WebSocket 服务。当接收到新订单提醒消息时,在页面上以弹窗或消息列表的形式展示订单信息。例如:
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<title>电商平台商家后台</title>
</head>
<body>
<h2>电商平台商家后台</h2>
<div id="order-notification"></div>
<script>
const socket = new WebSocket('ws://localhost:8080/ws/order');
socket.onopen = function () {
console.log('已连接到订单提醒服务');
};
socket.onmessage = function (event) {
const notificationDiv = document.getElementById('order - notification');
const message = document.createElement('div');
message.textContent = event.data;
notificationDiv.appendChild(message);
};
socket.onclose = function () {
console.log('订单提醒服务连接已关闭');
};
</script>
</body>
</html>
通过这样的实现,商家在使用电商平台管理后台时,能够实时收到新订单提醒,大大提高了订单处理效率,减少了订单处理的延迟,提升了客户满意度和商家运营效率 。
优化与扩展
(一)性能优化
- 设置合理的连接超时时间:在 WebSocket 配置中,设置合适的连接超时时间可以避免无效连接占用资源。例如,在 Spring Boot 中,可以通过
application.properties文件配置:spring.websocket.timeout=60000(单位为毫秒,这里设置为 60 秒) ,表示如果 60 秒内没有数据传输,连接将被关闭。也可以在 WebSocket 配置类中通过setHandshakeTimeout方法进行设置,如:
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(myWebSocketHandler(), "/ws")
.setAllowedOrigins("*")
.setHandshakeTimeout(60000);
}
//...
}
- 优化消息处理逻辑:避免在消息处理方法中执行耗时操作,如复杂的数据库查询、大规模数据计算等。如果有耗时任务,可将其放入异步线程池中执行,避免阻塞 WebSocket 连接线程。例如,使用 Spring 的
@Async注解将消息处理方法标记为异步方法:
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
@Component
public class MyWebSocketHandler extends TextWebSocketHandler {
@Async
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
// 处理消息逻辑,这里的耗时操作将在异步线程中执行
String payload = message.getPayload();
//...
}
}
- 使用心跳机制:WebSocket 协议本身没有内置的保持活动功能,可通过发送定期的心跳消息来防止连接被关闭。在 Spring 中,可以扩展
TextWebSocketHandler并覆盖其方法以定义自定义心跳逻辑。例如,每隔一段时间(如 30 秒)向客户端发送一个 ping 消息,客户端接收到 ping 消息后返回 pong 消息,服务器通过判断是否收到 pong 消息来确定连接是否正常:
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class HeartbeatWebSocketHandler extends TextWebSocketHandler {
private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
private static final long HEARTBEAT_INTERVAL = 30;
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
super.afterConnectionEstablished(session);
startHeartbeat(session);
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
super.afterConnectionClosed(session, status);
stopHeartbeat(session);
}
private void startHeartbeat(WebSocketSession session) {
scheduler.scheduleAtFixedRate(() -> {
if (session.isOpen()) {
try {
session.sendMessage(new TextMessage("ping"));
} catch (Exception e) {
e.printStackTrace();
try {
session.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}, 0, HEARTBEAT_INTERVAL, TimeUnit.SECONDS);
}
private void stopHeartbeat(WebSocketSession session) {
scheduler.shutdownNow();
}
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
String payload = message.getPayload();
if ("pong".equals(payload)) {
// 接收到pong消息,说明连接正常
} else {
super.handleTextMessage(session, message);
}
}
}
(二)功能扩展
- 使用 STOMP 协议增强消息能力:STOMP(Simple Text Oriented Messaging Protocol)是一种简单的面向文本的消息协议,可在 WebSocket 上使用,为 WebSocket 通信提供了更丰富的消息处理能力,如消息的订阅与发布、消息的广播、点对点消息发送等。在 Spring Boot 中,通过
@EnableWebSocketMessageBroker注解启用消息代理,并配置相关的消息前缀等。例如:
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketStompConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableSimpleBroker("/topic", "/queue");
config.setApplicationDestinationPrefixes("/app");
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/ws").setAllowedOriginPatterns("*").withSockJS();
}
}
在上述配置中,/topic用于广播消息(所有订阅该主题的客户端都能收到消息),/queue用于点对点消息(只有特定的接收者能收到消息),/app是客户端发送消息的前缀 。客户端可以通过/app前缀发送消息到服务器,服务器处理后通过/topic或/queue前缀将消息发送给相应的客户端。
2. 实现消息格式结构化:为了更好地处理不同类型的消息,提高消息的可读性和可维护性,可以将消息格式结构化,例如使用 JSON 格式。在消息发送端,将消息对象转换为 JSON 字符串发送,在接收端,将接收到的 JSON 字符串解析为消息对象。例如,定义一个消息类:
import com.fasterxml.jackson.annotation.JsonProperty;
public class Message {
@JsonProperty("type")
private String type;
@JsonProperty("content")
private String content;
public Message(String type, String content) {
this.type = type;
this.content = content;
}
// Getter 和 Setter 方法
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
}
在发送消息时:
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
public class MyWebSocketHandler extends TextWebSocketHandler {
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
String payload = message.getPayload();
ObjectMapper objectMapper = new ObjectMapper();
Message msg = objectMapper.readValue(payload, Message.class);
// 根据消息类型进行不同的处理
if ("chat".equals(msg.getType())) {
// 处理聊天消息
} else if ("system".equals(msg.getType())) {
// 处理系统消息
}
}
}
- 持久化聊天记录:如果应用需要保存聊天记录,以便用户后续查看历史聊天内容,可以将聊天消息存储到数据库中。使用 Spring Data JPA 等 ORM 框架,定义消息实体类和对应的 Repository 接口,在消息处理方法中,将消息保存到数据库。例如,定义消息实体类:
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
import java.time.LocalDateTime;
@Entity
public class ChatMessage {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private String sender;
private String receiver;
private String content;
private LocalDateTime timestamp;
// 构造方法、Getter 和 Setter 方法
public ChatMessage() {
}
public ChatMessage(String sender, String receiver, String content, LocalDateTime timestamp) {
this.sender = sender;
this.receiver = receiver;
this.content = content;
this.timestamp = timestamp;
}
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public String getSender() {
return sender;
}
public void setSender(String sender) {
this.sender = sender;
}
public String getReceiver() {
return receiver;
}
public void setReceiver(String receiver) {
this.receiver = receiver;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
public LocalDateTime getTimestamp() {
return timestamp;
}
public void setTimestamp(LocalDateTime timestamp) {
this.timestamp = timestamp;
}
}
定义 Repository 接口:
import org.springframework.data.jpa.repository.JpaRepository;
public interface ChatMessageRepository extends JpaRepository<ChatMessage, Long> {
}
在消息处理方法中保存消息:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;
import java.time.LocalDateTime;
public class MyWebSocketHandler extends TextWebSocketHandler {
@Autowired
private ChatMessageRepository chatMessageRepository;
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
String payload = message.getPayload();
// 假设消息格式为 "sender:receiver:content"
String[] parts = payload.split(":", 3);
String sender = parts[0];
String receiver = parts[1];
String content = parts[2];
ChatMessage chatMessage = new ChatMessage(sender, receiver, content, LocalDateTime.now());
chatMessageRepository.save(chatMessage);
}
}
