SpringBoot集成WebSocket,前端使用Vue

  

先搭建后端,初始化一个SpringBoot项目,添加如下依赖

<dependency>
  	<groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
    <version>1.3.5.RELEASE</version>
</dependency>

下图是目录结构

1)新建实体类 User


import lombok.Data;

@Data
public class User {
    private Integer userId;

    private String nickName;
    private String headImageURL;

}

2)创建websocket 配置类

@Configuration
public class WebSocketConfig {
    /**
     * 注入ServerEndpointExporter,
     * 这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket endpoint
     * @return
     */
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}

3)封装工具类

package com.example.websocket.utils;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;

import java.util.List;

/**
 *
 * @Description: 自定义响应数据结构
 *                 这个类是提供给门户,ios,安卓,微信商城用的
 *                 门户接受此类数据后需要使用本类的方法转换成对于的数据类型格式(类,或者list)
 *                 其他自行处理
 *                 200:表示成功
 *                 500:表示错误,错误信息在msg字段中
 *                 501:bean验证错误,不管多少个错误都以map形式返回
 *                 502:拦截器拦截到用户token出错
 *                 555:异常抛出信息
 * Copyright: Copyright (c) 2016
 * Company:Nathan.Lee.Salvatore
 */
public class JsonResult {

    // 定义jackson对象
    private static final ObjectMapper MAPPER = new ObjectMapper();

    // 响应业务状态
    private Integer status;

    // 响应消息
    private String msg;

    // 响应中的数据
    private Object data;

    private String ok;    // 不使用

    public static JsonResult build(Integer status, String msg, Object data) {
        return new JsonResult(status, msg, data);
    }

    public static JsonResult ok(Object data) {
        return new JsonResult(data);
    }

    public static JsonResult ok() {
        return new JsonResult(null);
    }

    public static JsonResult errorMsg(String msg) {
        return new JsonResult(500, msg, null);
    }

    public static JsonResult errorMap(Object data) {
        return new JsonResult(501, "error", data);
    }

    public static JsonResult errorTokenMsg(String msg) {
        return new JsonResult(502, msg, null);
    }

    public static JsonResult errorException(String msg) {
        return new JsonResult(555, msg, null);
    }

    public JsonResult() {

    }

//    public static LeeJSONResult build(Integer status, String msg) {
//        return new LeeJSONResult(status, msg, null);
//    }

    public JsonResult(Integer status, String msg, Object data) {
        this.status = status;
        this.msg = msg;
        this.data = data;
    }

    public JsonResult(Object data) {
        this.status = 200;
        this.msg = "OK";
        this.data = data;
    }

    public Boolean isOK() {
        return this.status == 200;
    }

    public Integer getStatus() {
        return status;
    }

    public void setStatus(Integer status) {
        this.status = status;
    }

    public String getMsg() {
        return msg;
    }

    public void setMsg(String msg) {
        this.msg = msg;
    }

    public Object getData() {
        return data;
    }

    public void setData(Object data) {
        this.data = data;
    }

    /**
     *
     * @Description: 将json结果集转化为LeeJSONResult对象
     *                 需要转换的对象是一个类
     * @param jsonData
     * @param clazz
     * @return
     *
     * @author leechenxiang
     * @date 2016年4月22日 下午8:34:58
     */
    public static JsonResult formatToPojo(String jsonData, Class<?> clazz) {
        try {
            if (clazz == null) {
                return MAPPER.readValue(jsonData, JsonResult.class);
            }
            JsonNode jsonNode = MAPPER.readTree(jsonData);
            JsonNode data = jsonNode.get("data");
            Object obj = null;
            if (clazz != null) {
                if (data.isObject()) {
                    obj = MAPPER.readValue(data.traverse(), clazz);
                } else if (data.isTextual()) {
                    obj = MAPPER.readValue(data.asText(), clazz);
                }
            }
            return build(jsonNode.get("status").intValue(), jsonNode.get("msg").asText(), obj);
        } catch (Exception e) {
            return null;
        }
    }

    /**
     *
     * @Description: 没有object对象的转化
     * @param json
     * @return
     *
     * @author leechenxiang
     * @date 2016年4月22日 下午8:35:21
     */
    public static JsonResult format(String json) {
        try {
            return MAPPER.readValue(json, JsonResult.class);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

    /**
     *
     * @Description: Object是集合转化
     *                 需要转换的对象是一个list
     * @param jsonData
     * @param clazz
     * @return
     *
     * @author leechenxiang
     * @date 2016年4月22日 下午8:35:31
     */
    public static JsonResult formatToList(String jsonData, Class<?> clazz) {
        try {
            JsonNode jsonNode = MAPPER.readTree(jsonData);
            JsonNode data = jsonNode.get("data");
            Object obj = null;
            if (data.isArray() && data.size() > 0) {
                obj = MAPPER.readValue(data.traverse(),
                        MAPPER.getTypeFactory().constructCollectionType(List.class, clazz));
            }
            return build(jsonNode.get("status").intValue(), jsonNode.get("msg").asText(), obj);
        } catch (Exception e) {
            return null;
        }
    }

    public String getOk() {
        return ok;
    }

    public void setOk(String ok) {
        this.ok = ok;
    }

}

4)新建websock 核心类

package com.example.websocket.config;

import com.example.websocket.model.User;
import com.example.websocket.utils.JsonResult;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.PostConstruct;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.CopyOnWriteArraySet;

@RestController
@ServerEndpoint(value = "/websocket/{sid}/{type}/{userId}")   //房间号、类型(1,直播聊天)、用户Id、
//@ServerEndpoint(value = "/websocket/{sid}")   //房间号、类型(1,直播聊天)、用户Id、
@Component
@Slf4j
public class WebSocket {
    @Autowired
    private static WebSocket webSocket;
    private static Logger logger = LoggerFactory.getLogger(WebSocket.class);

    //当前连接数
    private static int onlineCount = 0;

    //存放每个客户端对应的MyWebSocket对象。
    private static CopyOnWriteArraySet<WebSocket> webSocketSet = new CopyOnWriteArraySet<WebSocket>();

    //与某个客户端的连接会话
    private Session session;

    //客户端唯一标识sid(直播ID)
    private String sid = "";
    //用户类型
    private Integer type = 0;
    //用户ID
    private Integer userId = 0;
    //用户昵称
    private String nickName = "";
    //用户头像地址
    private String headImageUrl = "";

    //    @Autowired
//    private  UserRepository userRepository;
    public static void main(String[] args) throws IOException {
        webSocket.sendMessage("ttt");

    }



    private Integer count = 0;

    public void sendMessage(String message) throws IOException {

        this.session.getBasicRemote().sendText(message);
    }


    @PostConstruct
    public void init() {
        webSocket = this;
//        webSocket.userRepository = this.userRepository;
    }

    /**
     * 连接建立成功调用的方法
     */
    @OnOpen
    public void onOpen(Session session, @PathParam("sid") String sid, @PathParam("type") Integer type, @PathParam("userId") Integer userId) {
        moreWindow(sid, userId, type);
        //在线数加1
        addOnlineCount();
        this.session = session;
        //加入set中
        webSocketSet.add(this);
        this.sid = sid;
        this.userId = userId;
        this.type = type;
//        User user=WebSocket.webSocket.userRepository.findById(userId).get();
        User user = new User();
        user.setUserId(1);
        user.setHeadImageURL("http://cusndcni.png");
        user.setNickName("张三");
        this.nickName = user.getNickName();
        this.headImageUrl = user.getHeadImageURL();
        logger.info("用户ID:" + userId + "用户昵称:" + nickName + "新连接:sid=" + sid + " 当前在线人数" + getOnlineCount());
        try {
            sendMessage("连接成功");
//            sendMessageToClient();
        } catch (IOException e) {
            logger.error("websocket IO异常");
        }
    }

    @OnMessage
    public void onMessage(String message, Session session) {
        //群发消息
        for (WebSocket item : webSocketSet) {
            try {
                if (item.sid.equals(this.sid)) {
                    item.sendMessage(message);
                    System.out.println("--------------------" + message + "总人数" + onlineCount);
                }

            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 同一用户打开多个窗口问题
     *
     * @param sid
     */
    public void moreWindow(String sid, Integer userId, Integer type) {
        if (StringUtils.isEmpty(sid)) {
            return;
        }
        if (webSocketSet.isEmpty()) {
            return;
        }
        for (WebSocket item : webSocketSet) {
            if (item.sid.equals(sid) && item.userId.equals(userId) && item.type == type) {
                //已经有相同的了
                webSocketSet.remove(item);
                subOnlineCount();
            }
        }
    }

    /**
     * 发送消息给指定用户
     *
     * @param message
     * @param sid
     */
    public static void sendMessage(String message, @PathParam("sid") String sid) {
        logger.info("发送消息:sid=" + sid + " message:" + message);
        for (WebSocket item : webSocketSet) {
            try {
                if (sid == null) {
                    System.out.println("Runnable 处调用2");

                    item.sendMessage(message);
                    System.out.println("+++++++++++++++" + message);
                } else if (item.sid.equals(sid)) {
                    logger.info("开始发送消息:sid=" + sid);
                    item.sendMessage(message);
                }
            } catch (IOException e) {
                logger.error("发送消息失败 sid:" + sid, e);
                continue;
            }
        }
    }

    @OnClose
    public void onClose() {
        logger.info("连接关闭:sid=" + sid + " 当前在线人数" + getOnlineCount());
        webSocketSet.remove(this);
        subOnlineCount();
    }

    @OnError
    public void one rror(Session session, Throwable error) {
        error.printStackTrace();
    }

    /**
     * 当前在线人数
     *
     * @return
     */
    public static synchronized int getOnlineCount() {
        return onlineCount;
    }

    /**
     * 添加在线人数
     */
    public static synchronized void addOnlineCount() {
        WebSocket.onlineCount++;
    }

    /**
     * 减少在线人数
     */
    public static synchronized void subOnlineCount() {
        if (WebSocket.onlineCount <= 0) {
            WebSocket.onlineCount = 0;
            return;
        }
        WebSocket.onlineCount--;
    }

    /**
     * 人数列表
     */
    @PostMapping(value = "/numberList")
    public JsonResult numberList(@RequestParam(value = "sid") String sid, @RequestParam(value = "type") Integer type) {
        Map map = new HashMap<>();
        List<User> userList = new ArrayList<>();
        Integer count = 0;
        for (WebSocket item : webSocketSet) {
            if (item.sid != null && item.sid.equals(sid) && item.type == type) {
                User user = new User();
                user.setNickName(item.nickName);
                user.setUserId(item.userId);
                user.setHeadImageURL(item.headImageUrl);
                userList.add(user);
                count++;
            }
        }
        map.put("userList", userList);
        map.put("count", count);
        return new JsonResult(map);
    }
}

在后端通过定时器向前端推送消息。
法一:在 WebSocket 类中的构造函数中调用定时器。记住,一定得调用有两个参数的 sendMessage 方法


    public WebSocket() {
        testTimer();
    }

    public void testTimer() {
        new Timer().schedule(new TimerTask() {
            @Override
            public void run() {
                try {
                    //do Something
                    System.out.println(new Date().toString() + ": " + count);
                    count++;
                    String userId = null; // userId 为空时,会推送给连接此 WebSocket 的所有人
                    String message = "testmessage"; // 第三方接口返回数据
//                  sendMessage("连接成功"+ count + "次!");
                    sendMessage(message, userId); // 推送
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }, 8000, 1000);
    }

法二:在 使用 Scheduled 。在启动类加上 注解 @EnableScheduling,再到要定时的方法上加上 @Scheduled 。记住,一定得调用有两个参数的 sendMessage 方法


    @Scheduled(cron = "15/6 * * * * ?")
    public void sendMessageToClient() {
	  String userId = null; // userId 为空时,会推送给连接此 WebSocket 的所有人
	        String message = "testmessage"; // 第三方接口返回数据
	//                  sendMessage("连接成功"+ count + "次!");
	        sendMessage(message, userId); // 推送
	        System.out.println("Scheduled 处调用");
    }

法三:新建 PushAlarm 类,实现 ApplicationRunner 接口

package com.example.websocket.utils;

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;

import java.util.Date;

@Slf4j
@Component  // 被Spring容器管理
@Order(1)   // 如果多个自定义ApplicationRunner,用来表明执行顺序
public class PushAlarm implements ApplicationRunner {   // 服务启动后自动加载该类

//    @Autowired
//    GasSupport gasSupport;

    @Override
    public void run(ApplicationArguments args) throws Exception {
        log.info("------------->" + "项目启动,now =" + new Date());
        this.myTimer();
    }

    public void myTimer() {

        String userId = null; // userId 为空时,会推送给连接此 WebSocket 的所有人

        Runnable runnable1 = new Runnable() {
            @SneakyThrows
            @Override
            public void run() {
                while (true) {
//                    String message = gasSupport.GetWasteGasRealData(""); // 第三方接口返回数据
                    String message = "testmessage"; // 第三方接口返回数据
//                    WebSocket.sendMessage(message, userId); // 推送
                    System.out.println("Runnable 处调用");
                    Thread.sleep(5000);
                }
            }
        };

//        Runnable runnable2 = new Runnable() {
//            @SneakyThrows
//            @Override
//            public void run() {
//                while (true) {
//                    String message = gasSupport.GetWasteWaterRealData(""); // 第三方接口返回数据
//                    WebSocketServer.sendInfo(message, userId); // 推送
//                    Thread.sleep(5000);
//                }
//            }
//        };

        Thread thread1 = new Thread(runnable1);
//        Thread thread2 = new Thread(runnable2);

        thread1.start();
//        thread2.start();

    }
}

5)application.properties 中配置 端口 : server.port=8089
ok,至此后端部分完成。开始前端部分。

前端部分
新建一个vue项目,新建一个 WebSocket.vue 组件


在 HelloWorld.vue 组件中引入 WebSocket.vue 组件
然后在 WebSocket.vue 组件中 使用 websocket

<template>
  <div>
    WebSocket
  </div>
</template>
<script>
export default {
  created() {
    this.initWebSocket(); //进入页面开始初始化websocket建立连接
  },
  destroyed() {
    this.websocketclose(); //离开页面断开连接
  },
  data(){
    return{
      type: 1,
      userId: 2,
      sid: 'test'
    }
  },
  methods: {
    initWebSocket() {
      // WebSocket与普通的请求所用协议有所不同,ws等同于http,wss等同于https
      var url = 'ws://127.0.0.1:8089/websocket/' + this.sid + '/' + this.type + '/' + this.userId
      this.websock = new WebSocket(url); //这里是websocket服务地址,这里的地址可以前端根据后台地址参数规则拼成,也可以向后端请求
      // this.websock = new WebSocket("ws://localhost:8089/websocket/1"); //这里是websocket服务地址,这里的地址可以前端根据后台地址参数规则拼成,也可以向后端请求
      this.websock.onopen = this.websocketonopen;
      this.websock.onerror = this.websocketonerror;
      this.websock.onmessage = this.websocketonmessage;
      this.websock.onclose = this.websocketclose;
    },
    websocketonopen() {
      console.log("WebSocket连接成功");
    },
    websocketonerror(e) {
      console.log("WebSocket连接发生错误", e);
    },
    websocketonmessage(e) {
      console.log('收到后台的消息: ', e.data);                // console.log(e);
    },
    websocketclose(e) {
      console.log("connection closed (" + e.code + ")");
    },
  }
}
</script>

相关文章