WebSocket实现读取Redis的车辆实时数据推送到浏览器


应用背景需求:

    目前通过SparkStreaming实时读取到了Kafka的汽车的实时的位置数据,将其保证在Redis缓存中,现在需要每隔5秒,将redis的实时的最新汽车的位置信息,通过websocket 将信息推送到页面浏览器,以便动态在地图上显示汽车的位置信息。

    redis保存的数据格式用的是普通的kv

   key是汽车的终端编号,value 是JOSN的字符串,要素包括,采集时间,汽车编号,经度,维度,车速,油耗,朝向等信息

 应用websocket知识

  原理参考:https://www.cnblogs.com/qq99514925/p/12258036.html

代码实现:

  服务端代码:

  需要编写一个websocket服务端代码:如下

import org.java_websocket.WebSocket;
import org.java_websocket.handshake.ClientHandshake;
import org.java_websocket.server.WebSocketServer;

import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * WebSock服务端代码
 */
public class MyWebSockServer  extends WebSocketServer{
     private   AtomicInteger couts=new AtomicInteger(0);//统计连接数
     private  Map<WebSocket,ClientInfo> clients= new ConcurrentHashMap<WebSocket,ClientInfo>();//客户端连接集合

    public MyWebSockServer() throws UnknownHostException {
    }

    public MyWebSockServer(int port) throws UnknownHostException {
        super(new InetSocketAddress(port));
        System.out.println("websocket Server start at port:"+port);
    }
    //连接被打开的时候执行
    public void onOpen(WebSocket conn, ClientHandshake handshake) {
        System.out.println("---创建新连接-------"+handshake.getResourceDescriptor());
        System.out.print("hostname:"+conn.getRemoteSocketAddress().getHostName());
        System.out.print("address:"+conn.getRemoteSocketAddress().getAddress());
        System.out.print("port:"+conn.getRemoteSocketAddress().getPort());
        ClientInfo cinfo = new ClientInfo();
        cinfo.setCliURI(handshake.getResourceDescriptor());
        cinfo.setClinetIP(conn.getRemoteSocketAddress().getHostName());
        if(!clients.containsKey(conn)){
            clients.put(conn,cinfo);
            int currcount = couts.incrementAndGet();
            System.out.println("当前连接数"+currcount);
        }
    }

    //连接被关闭的时候被调用
    public void onClose(WebSocket conn, int code, String reason, boolean remote) {
        System.out.println("---连接被关闭------------");
        System.out.println(" coed:"+code+" reason:"+reason+" remote:"+remote);
        //清除连接
        if(clients.containsKey(conn)){
            clients.remove(conn);
            System.out.println("conn:"+conn+" is closed");
            int currcount = couts.decrementAndGet();
            System.out.println("当前连接:"+currcount);
        }

    }
    //收到客户端消息的时候调用
    public void onMessage(WebSocket conn, String message) {
        System.out.println("--收到客户端发过来的消息---"+message);
    }
    //连接报错的时候调用
    public void onError(WebSocket conn, Exception ex) {

    }
    //启动时候调用
    public void onStart() {
       //启动执行的方法
        System.out.println("--启动服务端------");
    }
    //向所有的终端发送消息
    public void sendMessToAll(String message){
          System.out.println("准备发送消息:"+message);
          for(Map.Entry<WebSocket,ClientInfo> entry:clients.entrySet()){
               entry.getKey().send(message);
          }
    }

    public static void main(String[] args) throws UnknownHostException {
        MyWebSockServer server = new MyWebSockServer(8887);
        server.start();
        //启动Redis监听线程,接受Redis消息
        Thread thread = new Thread(new RedisMessConsumer(server));
        thread.start();
    }
}

 读取redis的线程代码如下

import com.dtinone.util.ProperitesUtile;
import com.dtinone.util.redis.RedisSink;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPubSub;

import java.util.Properties;
import java.util.Set;

/**
 *  Redis消息订阅,通过websocket进行推送到前端
 *
 */
public class RedisMessConsumer  implements  Runnable{
    public static final String CHANNEL_KEY = "channe:carinfo";//频道
    private  MyWebSockServer server;

    public RedisMessConsumer(MyWebSockServer server) {
        this.server = server;
        //创建Jedis连接
    }

    public void run() {
        // 接受Redis的消息并消费
        Properties prop = ProperitesUtile.getProp("redis.properties");
        RedisSink redisSink = RedisSink.apply(prop);
        JedisPool jedisPool = redisSink.getJedisPool();
        Jedis jedis = jedisPool.getResource();
        //线程执行到这里将被阻塞,一直接收,知道调用unsubstri
        /*jedis.subscribe(new MyJedisPubSub(),CHANNEL_KEY);

        System.out.println("订阅结束");*/
         while(true){
             try {
                 consumerAllkeys(jedis);//每隔5秒广播车辆信息到前台客户端
             } catch (InterruptedException e) {
                 e.printStackTrace();
             }
         }


    }
    //查询所有的key的信息发送
    public void consumerAllkeys(Jedis jedis) throws InterruptedException {
             Thread.sleep(5000);//每隔5秒发送一次全部信息
             jedis.select(2);
             Set<String> keys = jedis.keys("*");
             for(String key:keys){
                 String value = jedis.get(key);//取出消费信息
                 server.sendMessToAll(value);
             }

    }
  class MyJedisPubSub extends JedisPubSub{
        @Override
        public void onMessage(String channel, String message) {
            System.out.println(Thread.currentThread().getName()+"-接收到消息:channel=" + channel + ",message=" + message);
            //super.onMessage(channel, message);
            if("quite".equals(message)){
                unsubscribe(CHANNEL_KEY);//取消订阅
            }
            else
            {
                server.sendMessToAll(message);
            }

        }

      @Override
      public void unsubscribe(String... channels) {
          super.unsubscribe(channels);
      }
  }
}

 

    RedisSink的Scala代码如下:获取redis连接池子

package com.dtinone.util.redis

import java.util.Properties

import org.apache.commons.pool2.impl.GenericObjectPoolConfig
import redis.clients.jedis.JedisPool

class RedisSink(createPool:()=>JedisPool) extends  Serializable {
  private  lazy  val jedispool:JedisPool=createPool()

  def getJedisPool(): JedisPool ={
       jedispool
  }
}
object RedisSink{
  //伴生对象
  def apply(conf:Properties): RedisSink = {
       val createPool=()=>{
      /*val host = conf.getProperty("redis.host").toString
      val port = conf.getProperty("redis.port'").toString.toInt*/
      val host="dtinone20"
      val port=6379
      val redisTimeout = 30000//超时时间
      val poolConfig = new GenericObjectPoolConfig()
      poolConfig.setMaxIdle(100)
      poolConfig.setMaxTotal(100)
      val pool = new JedisPool(poolConfig,host,port,redisTimeout)
       sys.addShutdownHook{
          pool.close()
       }
      pool
    }
     new RedisSink(createPool)
  }
}

页面浏览器代码

<!DOCTYPE html>
<html>
    <head>
        <meta charset="utf-8">
        <title>websockt连接</title>
        <script type="text/javascript">
              var ws=null;
              //获取连接
              function websock(){
                  //判断浏览器是否支持
                  if("WebSocket" in window){
                       alert("您的浏览器支持 WebSocket!");
                       ws=  new WebSocket("ws://localhost:8887/webscoket");
                       //连接打开调用的方法
                       ws.onopen=function(){
                           setMessageInnerHTML("连接打开")
                       }
                       //接收到服务器的消息
                       ws.onmessage=function(event){
                           var receive_mess=event.data
                           setMessageInnerHTML("接收到服务端消息:"+receive_mess)
                       }
                       //连接关闭调用
                       ws.onclose=function(){
                           setMessageInnerHTML("连接被关闭")
                       }
                       //连接发送错误的时候调用
                       ws.onerror=function(){
                           setMessageInnerHTML("方式错误")
                       }
                       
                       
                  }
                  else{
                     alert("您的浏览器不支持 WebSocket!"); 
                  }
              }
              function setMessageInnerHTML(mess){
                var inner=  document.getElementById("div2").innerHTML
                document.getElementById("div2").innerHTML=inner+"<br>"+mess
              }
              function clearHtml(){
                  document.getElementById("div2").innerHTML="";
              }
              function closeWebScokte(){
                  if(ws !=null){
                        ws.close()
                  } 
              }
              //监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常。 
              window.onbeforeunload = function() { 
                  closeWebScokte()
              } 
        </script>
    </head>
    <body>
         <div id="div1">
             <input type="button" onclick="websock()" value="连接websocke" />
              <input type="button" onclick="closeWebScokte()" value="关闭连接" />
              <input type="button" onclick="clearHtml()" value="清空信息" />
         </div>
         <div id="div2">
             
         </div>
    </body>
</html>

 

 

  


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM