package com.cloud.retrieve.service.impl; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.cloud.retrieve.filter.AuthNoneIgnore; import com.cloud.retrieve.service.EsDataService; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import javax.websocket.*; import javax.websocket.server.ServerEndpoint; import java.io.IOException; import java.sql.Timestamp; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.*; import java.util.concurrent.CopyOnWriteArraySet; @Slf4j @ServerEndpoint(value = "/alarmWebSocketServer/{userId}/{id}") @Component public class MyWebSocket { //静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。 private static int onlineCount = 0; //concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。 private static CopyOnWriteArraySet webSocketSet = new CopyOnWriteArraySet(); //与某个客户端的连接会话,需要通过它来给客户端发送数据 private Session session; private Long picDate; /* @Autowired private EsDataService esDataService;*/ public static EsDataService esDataService ; private HashMap curMap = new HashMap<>(); private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS"); /** * 连接建立成功调用的方法*/ @OnOpen public void onOpen(Session session){ this.session = session; webSocketSet.add(this); //加入set中 addOnlineCount(); //在线数加1 log.info("有新连接加入!当前在线人数为" + getOnlineCount()); try { JSONObject alarmData = esDataService.findEsAllAlarmDataByPage(1, 200,null,null); alarmData.put("data", alarmData.getJSONArray("equList")); alarmData.remove("equList"); alarmData.put("success", true); String s = alarmData.toJSONString(); sendMessage(s); JSONArray equList = alarmData.getJSONArray("data"); String dateStr = ((JSONObject)equList.get(equList.size()-1)).getString("picDate"); try { this.picDate = sdf.parse(dateStr).getTime(); } catch (ParseException e) { e.printStackTrace(); } // this.picDate = Timestamp.valueOf(((JSONObject)equList.get(equList.size()-1)).getString("picDate")); // this.likeDate = ((JSONObject)equList.get(equList.size()-1)).getSqlDate("picDate"); } catch (IOException e){ System.out.println("IO异常;"+e); } } /** * 连接关闭调用的方法 */ @OnClose public void onClose(){ webSocketSet.remove(this); //从set中删除 subOnlineCount(); //在线数减1 log.info("有一连接关闭!当前在线人数为" + getOnlineCount()); } /** * 收到客户端消息后调用的方法 * * @param message 客户端发送过来的消息*/ @OnMessage public void onMessage(String message, Session session) { log.info("来自客户端的消息:" + message); //群发消息 /* for (MyWebSocket item : webSocketSet){ try { item.sendMessage(message); } catch (IOException e) { e.printStackTrace(); } }*/ } /** * 发生错误时调用 * */ @OnError public void onError(Session session, Throwable error) { log.info("推送数据发生错误"); error.printStackTrace(); } public void sendMessage(String message) throws IOException { this.session.getBasicRemote().sendText(message); //this.session.getAsyncRemote().sendText(message); } /** * 群发自定义消息 * */ public static void sendInfo() throws IOException { while(true){ try { Thread.sleep(2000); JSONObject alarmData = esDataService.findEsAllAlarmDataByPage(1, 200,null,null); for (MyWebSocket item : webSocketSet){ try { if((alarmData !=null && alarmData.get("total")!= null && Integer.parseInt(alarmData.getString("total")) > 0)){ JSONArray curAlarmList = alarmData.getJSONArray("equList"); List sendList = new ArrayList<>(); Long lastLikeDate = null; if(item.picDate !=null){//socket获取过数据 lastLikeDate = item.picDate; for(int i=0;i < curAlarmList.size();i++){ JSONObject obj = (JSONObject) curAlarmList.get(i); Long sTime =null; try { sTime = sdf.parse(obj.getString("picDate")).getTime(); } catch (ParseException e) { e.printStackTrace(); } // Long sTime = Timestamp.valueOf(obj.getString("picDate")); // Date sTime = obj.getSqlDate("likeDate"); String recordId = obj.get("Id").toString(); if(sTime > lastLikeDate && !item.curMap.containsKey(recordId)){ item.curMap.put(recordId, obj); sendList.add(obj); } } }else{ for(int i=0;i < curAlarmList.size();i++){ JSONObject obj = (JSONObject) curAlarmList.get(i); String recordId = obj.get("Id").toString(); item.curMap.put(recordId, obj); sendList.add(obj); } } if(sendList.size() > 0){ JSONObject sData = new JSONObject(); sData.put("total",alarmData.getString("total")); sData.put("data", sendList); sData.put("success", true); if(sendList.size()<20) log.info("boot==ws推送es数据:"+sendList); item.sendMessage(sData.toJSONString()); String dateStr = ((JSONObject)sendList.get(sendList.size()-1)).getString("picDate"); try { // log.info("picDate上次比对时间:"+sdf.parse(dateStr).getTime()); item.picDate = sdf.parse(dateStr).getTime(); } catch (ParseException e) { e.printStackTrace(); } // Timestamp sTime = Timestamp.valueOf(((JSONObject)sendList.get(sendList.size()-1)).getString("picDate")); // lastLikeDate = ((JSONObject)sendList.get(sendList.size()-1)).getSqlDate("likeDate"); // log.info("websocket推送es数据 startTime:"+lastLikeDate); // item.picDate = lastLikeDate; } } } catch (Exception e) { e.printStackTrace(); } } } catch (InterruptedException e) { e.printStackTrace(); } } } public static synchronized int getOnlineCount() { return onlineCount; } public static synchronized void addOnlineCount() { MyWebSocket.onlineCount++; } public static synchronized void subOnlineCount() { MyWebSocket.onlineCount--; } }