websocket实现心跳机制和断线重连

wylc123 1年前 ⋅ 2181 阅读

1. Java后端实现Websocket服务器   

1.1 引入websocket相关依赖
<!-- websocket相关 -->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-websocket</artifactId>
		</dependency>
1.2 配置在tomcat容器启动和不在tomcat启动两种使用情况

application.yml中配置

my:
  websocket:
    tomcatflag: true #是否是内置Tomcat容器,true内置Tomcat,false外部Tomcat
1.3 添加配置类:
package cnki.bdms.web.ui.config;

/**
 * @ClassName: MyServerConfig
 * @Description: (这里用一句话描述这个类的作用)
 * @author: SongBin
 * @date: 2018年9月25日 上午10:40:56
 */

import org.springframework.boot.context.properties.ConfigurationProperties;

/**
 * @author SongBin on 2018/9/25.
 */
@ConfigurationProperties(prefix = "my.websocket")
public class MyServerConfig
{
    private boolean tomcatflag;
//    private String wsurl;
	/**
	 * @return the tomcatflag
	 */
	public boolean isTomcatflag() {
		return tomcatflag;
	}

	/**
	 * @param tomcatflag the tomcatflag to set
	 */
	public void setTomcatflag(boolean tomcatflag) {
		this.tomcatflag = tomcatflag;
	}
    
}
package cnki.bdms.web.ui.config;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

/**
 * @author SongBin on 2018/9/18.
 */
@Configuration
@EnableConfigurationProperties(MyServerConfig.class)
public class WebSocketConfig {
	@Autowired
    private MyServerConfig myser;
	@Bean
    public ServerEndpointExporter serverEndpointExporter() {
		if(myser.isTomcatflag()) {
			return new ServerEndpointExporter();
		}else{
			System.out.println("内置Tomcat容器不需要初始化ServerEndpointExporter");
			return null;
		}
    }
}
1.4 实现websocket收发消息服务
package cnki.bdms.base;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.CopyOnWriteArraySet;

/**
 * @author SongBin on 2018/9/18.
 */
/**
 * @ServerEndpoint 注解是一个类层次的注解,它的功能主要是将目前的类定义成一个websocket服务器端,
 * 注解的值将被用于监听用户连接的终端访问URL地址,客户端可以通过这个URL来连接到WebSocket服务器端
 */
@ServerEndpoint("/websocket/{sid}")
@Component
public class WebSocketServer {
    //static Log log=LogFactory.get(WebSocketServer.class);
    private static Logger log = LoggerFactory.getLogger(WebSocketServer.class);
    //静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
    private static int onlineCount = 0;
    //concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
    private static CopyOnWriteArraySet<WebSocketServer> webSocketSet = new CopyOnWriteArraySet<WebSocketServer>();

    //与某个客户端的连接会话,需要通过它来给客户端发送数据
    private Session session;

    //接收sid
    private String sid="";
    /**
     * 连接建立成功调用的方法*/
    @OnOpen
    public void onOpen(Session session,@PathParam("sid") String sid) {
        this.session = session;
        this.sid=sid;
        webSocketSet.add(this);     //加入set中
        addOnlineCount();           //在线数加1
        log.debug("有新窗口开始监听:"+sid+",当前在线人数为" + getOnlineCount());
        try {
            sendMessage("连接成功");
        } catch (IOException e) {
            log.error("websocket IO异常");
        }
    }

    /**
     * 连接关闭调用的方法
     */
    @OnClose
    public void onClose() {
        webSocketSet.remove(this);  //从set中删除
        subOnlineCount();           //在线数减1
        log.debug("有一连接关闭!当前在线人数为" + getOnlineCount());
    }

    /**
     * 收到客户端消息后调用的方法
     *
     * @param message 客户端发送过来的消息*/
    @OnMessage
    public void onMessage(String message, Session session) {
        log.debug("收到来自窗口"+sid+"的信息:"+message);
        //群发消息
        for (WebSocketServer item : webSocketSet) {
            try {
                item.sendMessage(message);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     *
     * @param session
     * @param error
     */
    @OnError
    public void onError(Session session, Throwable error) {
        log.error("发生错误");
        error.printStackTrace();
    }
   /**
     * 实现服务器主动推送
     * The remote endpoint was in state [TEXT_FULL_WRITING] which is an invalid state for called method的问题在于:handlerA和handlerB两个方法有可能同时执行,当A或者B方法遍历到某一个session并且调用sendMessage发送消息的时候,另外一个方法也正好也在使用相同的session发送另外一个消息(同一个session消息发送冲突了,也就是说同一个时刻,多个线程向一个socket写数据冲突了),就会报TEXT_FULL_WRITING异常。
     * 一般采用的解决方案是使用同步锁加同步发送(session.getBasicRemote())的方式,但很多时候需要异步发送。
     * 在我的项目中,采用Tomcat,TEXT_FULL_WRITING会直接导致websocket连接断掉,采用这种解决方案能保证一个session同时只会在发送一条消息,所以避免了TEXT_FULL_WRITING错误。
     */
    public void sendMessage(String message) throws IOException {
        synchronized(session){
            this.session.getBasicRemote().sendText(message);
        }
    }

    /**
     * 实现服务器主动推送
     */
    public void sendMessage(Session session, String message) throws IOException {
        session.getBasicRemote().sendText(message);
    }

    /**
     * 群发自定义消息
     * */
    public static void sendInfo(String message,@PathParam("sid") String sid) throws IOException {
        log.debug("推送消息到窗口"+sid+",推送内容:"+message);
        for (WebSocketServer item : webSocketSet) {
            try {
                //这里可以设定只推送给这个sid的,为null则全部推送
                if(sid==null) {
                    item.sendMessage(message);
                }else if(item.sid.equals(sid)){
                    item.sendMessage(message);
                }
            } catch (IOException e) {
                continue;
            }
        }
    }

    public static synchronized int getOnlineCount() {
        return onlineCount;
    }

    public static synchronized void addOnlineCount() {
        WebSocketServer.onlineCount++;
    }

    public static synchronized void subOnlineCount() {
        WebSocketServer.onlineCount--;
    }
}

2. Vue实现前端连接,心跳机制,重连机制

websocket.js

/*
 * @Description: websocket mixins文件
 * @Author: chenfengtao
 * @Date: 2019-12-13 13:59:12
 * @LastEditors: chenfengtao
 * @LastEditTime: 2020-06-30 10:56:56
 */

/**
  * 1、接收到消息时,开启心跳包;连接关闭或者出错时,停止发送心跳包
  * 2、主动停止连接,重新连接
  * 3、该文件混入到组件中之后,重写ws监听方法
  */

 export const websocket = {
  data () {
    return {
      ws: null,
      // 发送心跳包间隔时间
      timeout: 60000, // 60s
      timer: null,
      limitCount: 0, //重连次数限制
      reconnecttimer: null,
      lockReconnect: false, //避免重复连接,
      heartBeatTxt: '{"HeartBeat": true}' // 心跳包发送的消息内容
    }
  },
  methods: {
    initSocket (wspath) {
      if (typeof (WebSocket) === 'undefined') {
        console.error('浏览器不支持WebSocket')
      } else {
        // 实例化
        this.ws = new WebSocket(wspath)
        // 监听连接成功
        this.ws.onopen = this.onopen
        // 监听接收消息
        this.ws.onmessage = this.onmessage
        // 监听关闭连接
        this.ws.onclose = this.onclose
        // 监听错误消息
        this.ws.onerror = this.onerror
      }
    },
    reconnect(wspath) {
      let _this = this
      debugger
      console.log('已触发重连')
      if(this.lockReconnect) {
        return
      };
      this.lockReconnect = true
      this.limitCount += 1
      this.reconnecttimer && clearTimeout(this.reconnecttimer)
      // if(this.limitCount < 12){
        //没连接上会重连12次,设置延迟避免请求过多
        this.reconnecttimer = setTimeout(function () {
          _this.initSocket(wspath)
          _this.lockReconnect = false
          console.log('第'+_this.limitCount+'次重连!')
        }, 2000)
        // this.limitCount = this.limitCount + 1
      // }
    },
    onopen () {
      console.log('连接成功')
    },
    onmessage () {
      console.log('收到消息')
    },
    onclose () {
      console.log('连接关闭')
    },
    onerror () {
      console.log('连接出错')
    },
    // 主动关闭连接
    stopConnect () {
      this.ws.close()
      this.ws = null
      console.log('取消连接')
    },
    // 主动重连
    // reConnect () {
    //   this.ws = null
    //   this.init()
    // },
    // 开始发送心跳包
    startHeartBeat () {
      debugger
      let _this = this
      this.timer = setInterval(function () {
        if (_this.ws.readyState === 1) {
          _this.ws.send(_this.heartBeatTxt)
        }else {
          console.log('心跳断开,服务端断开,执行ws.close()触发重连!')
          _this.ws.close()
        }
      }, this.timeout)
    },
    // 停止发送心跳包
    stopHeartBeat () {
      clearInterval(this.timer)
    },
    // 停止重连
    // stopReConnect () {
    //   clearInterval(this.reconnecttimer)
    // },
    // 重置心跳包
    resetHeartBeat () {
      clearInterval(this.timer)
      this.startHeartBeat()
    }
  }
}

 

使用的时候在页面引入websocket.js

重写

onmessage,
    onopen () {
      console.log('成功打开连接')
      debugger
      // 发送心跳包
      this.startHeartBeat()
    },
    onclose () {
      console.log('连接关闭,重新发起连接')
      this.reconnect(this.wspath)
    },
    onerror () {
      console.log('连接出错,重新发起连接')
      this.reconnect(this.wspath)
    },

这些方法实现自己的业务逻辑,断开重连,出错重连,心跳请求状态不为1,断开连接触发onclose里的重连,在页面destroyed时关闭连接

destroyed () {
    if (this.ws) {
      console.log('取消连接destroyed')
      this.stopConnect()
      this.stopHeartBeat()
    }
  }

3.参考

https://blog.csdn.net/weixin_41358306/article/details/102582203


全部评论: 0

    我有话说: