1. Java后端实现Websocket服务器
1.1 引入websocket相关依赖
<!-- websocket相关 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
1.2 配置在tomcat容器启动和不在tomcat启动两种使用情况
application.yml中配置
my:
websocket:
tomcatflag: true #是否是内置Tomcat容器,true内置Tomcat,false外部Tomcat
1.3 添加配置类:
package cnki.bdms.web.ui.config;
/**
* @ClassName: MyServerConfig
* @Description: (这里用一句话描述这个类的作用)
* @author: SongBin
* @date: 2018年9月25日 上午10:40:56
*/
import org.springframework.boot.context.properties.ConfigurationProperties;
/**
* @author SongBin on 2018/9/25.
*/
@ConfigurationProperties(prefix = "my.websocket")
public class MyServerConfig
{
private boolean tomcatflag;
// private String wsurl;
/**
* @return the tomcatflag
*/
public boolean isTomcatflag() {
return tomcatflag;
}
/**
* @param tomcatflag the tomcatflag to set
*/
public void setTomcatflag(boolean tomcatflag) {
this.tomcatflag = tomcatflag;
}
}
package cnki.bdms.web.ui.config;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
/**
* @author SongBin on 2018/9/18.
*/
@Configuration
@EnableConfigurationProperties(MyServerConfig.class)
public class WebSocketConfig {
@Autowired
private MyServerConfig myser;
@Bean
public ServerEndpointExporter serverEndpointExporter() {
if(myser.isTomcatflag()) {
return new ServerEndpointExporter();
}else{
System.out.println("内置Tomcat容器不需要初始化ServerEndpointExporter");
return null;
}
}
}
1.4 实现websocket收发消息服务
package cnki.bdms.base;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.CopyOnWriteArraySet;
/**
* @author SongBin on 2018/9/18.
*/
/**
* @ServerEndpoint 注解是一个类层次的注解,它的功能主要是将目前的类定义成一个websocket服务器端,
* 注解的值将被用于监听用户连接的终端访问URL地址,客户端可以通过这个URL来连接到WebSocket服务器端
*/
@ServerEndpoint("/websocket/{sid}")
@Component
public class WebSocketServer {
//static Log log=LogFactory.get(WebSocketServer.class);
private static Logger log = LoggerFactory.getLogger(WebSocketServer.class);
//静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
private static int onlineCount = 0;
//concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
private static CopyOnWriteArraySet<WebSocketServer> webSocketSet = new CopyOnWriteArraySet<WebSocketServer>();
//与某个客户端的连接会话,需要通过它来给客户端发送数据
private Session session;
//接收sid
private String sid="";
/**
* 连接建立成功调用的方法*/
@OnOpen
public void onOpen(Session session,@PathParam("sid") String sid) {
this.session = session;
this.sid=sid;
webSocketSet.add(this); //加入set中
addOnlineCount(); //在线数加1
log.debug("有新窗口开始监听:"+sid+",当前在线人数为" + getOnlineCount());
try {
sendMessage("连接成功");
} catch (IOException e) {
log.error("websocket IO异常");
}
}
/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose() {
webSocketSet.remove(this); //从set中删除
subOnlineCount(); //在线数减1
log.debug("有一连接关闭!当前在线人数为" + getOnlineCount());
}
/**
* 收到客户端消息后调用的方法
*
* @param message 客户端发送过来的消息*/
@OnMessage
public void onMessage(String message, Session session) {
log.debug("收到来自窗口"+sid+"的信息:"+message);
//群发消息
for (WebSocketServer item : webSocketSet) {
try {
item.sendMessage(message);
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
*
* @param session
* @param error
*/
@OnError
public void onError(Session session, Throwable error) {
log.error("发生错误");
error.printStackTrace();
}
/**
* 实现服务器主动推送
* The remote endpoint was in state [TEXT_FULL_WRITING] which is an invalid state for called method的问题在于:handlerA和handlerB两个方法有可能同时执行,当A或者B方法遍历到某一个session并且调用sendMessage发送消息的时候,另外一个方法也正好也在使用相同的session发送另外一个消息(同一个session消息发送冲突了,也就是说同一个时刻,多个线程向一个socket写数据冲突了),就会报TEXT_FULL_WRITING异常。
* 一般采用的解决方案是使用同步锁加同步发送(session.getBasicRemote())的方式,但很多时候需要异步发送。
* 在我的项目中,采用Tomcat,TEXT_FULL_WRITING会直接导致websocket连接断掉,采用这种解决方案能保证一个session同时只会在发送一条消息,所以避免了TEXT_FULL_WRITING错误。
*/
public void sendMessage(String message) throws IOException {
synchronized(session){
this.session.getBasicRemote().sendText(message);
}
}
/**
* 实现服务器主动推送
*/
public void sendMessage(Session session, String message) throws IOException {
session.getBasicRemote().sendText(message);
}
/**
* 群发自定义消息
* */
public static void sendInfo(String message,@PathParam("sid") String sid) throws IOException {
log.debug("推送消息到窗口"+sid+",推送内容:"+message);
for (WebSocketServer item : webSocketSet) {
try {
//这里可以设定只推送给这个sid的,为null则全部推送
if(sid==null) {
item.sendMessage(message);
}else if(item.sid.equals(sid)){
item.sendMessage(message);
}
} catch (IOException e) {
continue;
}
}
}
public static synchronized int getOnlineCount() {
return onlineCount;
}
public static synchronized void addOnlineCount() {
WebSocketServer.onlineCount++;
}
public static synchronized void subOnlineCount() {
WebSocketServer.onlineCount--;
}
}
2. Vue实现前端连接,心跳机制,重连机制
websocket.js
/*
* @Description: websocket mixins文件
* @Author: chenfengtao
* @Date: 2019-12-13 13:59:12
* @LastEditors: chenfengtao
* @LastEditTime: 2020-06-30 10:56:56
*/
/**
* 1、接收到消息时,开启心跳包;连接关闭或者出错时,停止发送心跳包
* 2、主动停止连接,重新连接
* 3、该文件混入到组件中之后,重写ws监听方法
*/
export const websocket = {
data () {
return {
ws: null,
// 发送心跳包间隔时间
timeout: 60000, // 60s
timer: null,
limitCount: 0, //重连次数限制
reconnecttimer: null,
lockReconnect: false, //避免重复连接,
heartBeatTxt: '{"HeartBeat": true}' // 心跳包发送的消息内容
}
},
methods: {
initSocket (wspath) {
if (typeof (WebSocket) === 'undefined') {
console.error('浏览器不支持WebSocket')
} else {
// 实例化
this.ws = new WebSocket(wspath)
// 监听连接成功
this.ws.onopen = this.onopen
// 监听接收消息
this.ws.onmessage = this.onmessage
// 监听关闭连接
this.ws.onclose = this.onclose
// 监听错误消息
this.ws.onerror = this.onerror
}
},
reconnect(wspath) {
let _this = this
debugger
console.log('已触发重连')
if(this.lockReconnect) {
return
};
this.lockReconnect = true
this.limitCount += 1
this.reconnecttimer && clearTimeout(this.reconnecttimer)
// if(this.limitCount < 12){
//没连接上会重连12次,设置延迟避免请求过多
this.reconnecttimer = setTimeout(function () {
_this.initSocket(wspath)
_this.lockReconnect = false
console.log('第'+_this.limitCount+'次重连!')
}, 2000)
// this.limitCount = this.limitCount + 1
// }
},
onopen () {
console.log('连接成功')
},
onmessage () {
console.log('收到消息')
},
onclose () {
console.log('连接关闭')
},
onerror () {
console.log('连接出错')
},
// 主动关闭连接
stopConnect () {
this.ws.close()
this.ws = null
console.log('取消连接')
},
// 主动重连
// reConnect () {
// this.ws = null
// this.init()
// },
// 开始发送心跳包
startHeartBeat () {
debugger
let _this = this
this.timer = setInterval(function () {
if (_this.ws.readyState === 1) {
_this.ws.send(_this.heartBeatTxt)
}else {
console.log('心跳断开,服务端断开,执行ws.close()触发重连!')
_this.ws.close()
}
}, this.timeout)
},
// 停止发送心跳包
stopHeartBeat () {
clearInterval(this.timer)
},
// 停止重连
// stopReConnect () {
// clearInterval(this.reconnecttimer)
// },
// 重置心跳包
resetHeartBeat () {
clearInterval(this.timer)
this.startHeartBeat()
}
}
}
使用的时候在页面引入websocket.js
重写
onmessage,
onopen () {
console.log('成功打开连接')
debugger
// 发送心跳包
this.startHeartBeat()
},
onclose () {
console.log('连接关闭,重新发起连接')
this.reconnect(this.wspath)
},
onerror () {
console.log('连接出错,重新发起连接')
this.reconnect(this.wspath)
},
这些方法实现自己的业务逻辑,断开重连,出错重连,心跳请求状态不为1,断开连接触发onclose里的重连,在页面destroyed时关闭连接
destroyed () {
if (this.ws) {
console.log('取消连接destroyed')
this.stopConnect()
this.stopHeartBeat()
}
}
3.参考
https://blog.csdn.net/weixin_41358306/article/details/102582203
注意:本文归作者所有,未经作者允许,不得转载