WebSocket實現讀取Redis的車輛實時數據推送到瀏覽器


應用背景需求:

    目前通過SparkStreaming實時讀取到了Kafka的汽車的實時的位置數據,將其保證在Redis緩存中,現在需要每隔5秒,將redis的實時的最新汽車的位置信息,通過websocket 將信息推送到頁面瀏覽器,以便動態在地圖上顯示汽車的位置信息。

    redis保存的數據格式用的是普通的kv

   key是汽車的終端編號,value 是JOSN的字符串,要素包括,采集時間,汽車編號,經度,維度,車速,油耗,朝向等信息

 應用websocket知識

  原理參考:https://www.cnblogs.com/qq99514925/p/12258036.html

代碼實現:

  服務端代碼:

  需要編寫一個websocket服務端代碼:如下

import org.java_websocket.WebSocket;
import org.java_websocket.handshake.ClientHandshake;
import org.java_websocket.server.WebSocketServer;

import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * WebSock服務端代碼
 */
public class MyWebSockServer  extends WebSocketServer{
     private   AtomicInteger couts=new AtomicInteger(0);//統計連接數
     private  Map<WebSocket,ClientInfo> clients= new ConcurrentHashMap<WebSocket,ClientInfo>();//客戶端連接集合

    public MyWebSockServer() throws UnknownHostException {
    }

    public MyWebSockServer(int port) throws UnknownHostException {
        super(new InetSocketAddress(port));
        System.out.println("websocket Server start at port:"+port);
    }
    //連接被打開的時候執行
    public void onOpen(WebSocket conn, ClientHandshake handshake) {
        System.out.println("---創建新連接-------"+handshake.getResourceDescriptor());
        System.out.print("hostname:"+conn.getRemoteSocketAddress().getHostName());
        System.out.print("address:"+conn.getRemoteSocketAddress().getAddress());
        System.out.print("port:"+conn.getRemoteSocketAddress().getPort());
        ClientInfo cinfo = new ClientInfo();
        cinfo.setCliURI(handshake.getResourceDescriptor());
        cinfo.setClinetIP(conn.getRemoteSocketAddress().getHostName());
        if(!clients.containsKey(conn)){
            clients.put(conn,cinfo);
            int currcount = couts.incrementAndGet();
            System.out.println("當前連接數"+currcount);
        }
    }

    //連接被關閉的時候被調用
    public void onClose(WebSocket conn, int code, String reason, boolean remote) {
        System.out.println("---連接被關閉------------");
        System.out.println(" coed:"+code+" reason:"+reason+" remote:"+remote);
        //清除連接
        if(clients.containsKey(conn)){
            clients.remove(conn);
            System.out.println("conn:"+conn+" is closed");
            int currcount = couts.decrementAndGet();
            System.out.println("當前連接:"+currcount);
        }

    }
    //收到客戶端消息的時候調用
    public void onMessage(WebSocket conn, String message) {
        System.out.println("--收到客戶端發過來的消息---"+message);
    }
    //連接報錯的時候調用
    public void onError(WebSocket conn, Exception ex) {

    }
    //啟動時候調用
    public void onStart() {
       //啟動執行的方法
        System.out.println("--啟動服務端------");
    }
    //向所有的終端發送消息
    public void sendMessToAll(String message){
          System.out.println("准備發送消息:"+message);
          for(Map.Entry<WebSocket,ClientInfo> entry:clients.entrySet()){
               entry.getKey().send(message);
          }
    }

    public static void main(String[] args) throws UnknownHostException {
        MyWebSockServer server = new MyWebSockServer(8887);
        server.start();
        //啟動Redis監聽線程,接受Redis消息
        Thread thread = new Thread(new RedisMessConsumer(server));
        thread.start();
    }
}

 讀取redis的線程代碼如下

import com.dtinone.util.ProperitesUtile;
import com.dtinone.util.redis.RedisSink;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPubSub;

import java.util.Properties;
import java.util.Set;

/**
 *  Redis消息訂閱,通過websocket進行推送到前端
 *
 */
public class RedisMessConsumer  implements  Runnable{
    public static final String CHANNEL_KEY = "channe:carinfo";//頻道
    private  MyWebSockServer server;

    public RedisMessConsumer(MyWebSockServer server) {
        this.server = server;
        //創建Jedis連接
    }

    public void run() {
        // 接受Redis的消息並消費
        Properties prop = ProperitesUtile.getProp("redis.properties");
        RedisSink redisSink = RedisSink.apply(prop);
        JedisPool jedisPool = redisSink.getJedisPool();
        Jedis jedis = jedisPool.getResource();
        //線程執行到這里將被阻塞,一直接收,知道調用unsubstri
        /*jedis.subscribe(new MyJedisPubSub(),CHANNEL_KEY);

        System.out.println("訂閱結束");*/
         while(true){
             try {
                 consumerAllkeys(jedis);//每隔5秒廣播車輛信息到前台客戶端
             } catch (InterruptedException e) {
                 e.printStackTrace();
             }
         }


    }
    //查詢所有的key的信息發送
    public void consumerAllkeys(Jedis jedis) throws InterruptedException {
             Thread.sleep(5000);//每隔5秒發送一次全部信息
             jedis.select(2);
             Set<String> keys = jedis.keys("*");
             for(String key:keys){
                 String value = jedis.get(key);//取出消費信息
                 server.sendMessToAll(value);
             }

    }
  class MyJedisPubSub extends JedisPubSub{
        @Override
        public void onMessage(String channel, String message) {
            System.out.println(Thread.currentThread().getName()+"-接收到消息:channel=" + channel + ",message=" + message);
            //super.onMessage(channel, message);
            if("quite".equals(message)){
                unsubscribe(CHANNEL_KEY);//取消訂閱
            }
            else
            {
                server.sendMessToAll(message);
            }

        }

      @Override
      public void unsubscribe(String... channels) {
          super.unsubscribe(channels);
      }
  }
}

 

    RedisSink的Scala代碼如下:獲取redis連接池子

package com.dtinone.util.redis

import java.util.Properties

import org.apache.commons.pool2.impl.GenericObjectPoolConfig
import redis.clients.jedis.JedisPool

class RedisSink(createPool:()=>JedisPool) extends  Serializable {
  private  lazy  val jedispool:JedisPool=createPool()

  def getJedisPool(): JedisPool ={
       jedispool
  }
}
object RedisSink{
  //伴生對象
  def apply(conf:Properties): RedisSink = {
       val createPool=()=>{
      /*val host = conf.getProperty("redis.host").toString
      val port = conf.getProperty("redis.port'").toString.toInt*/
      val host="dtinone20"
      val port=6379
      val redisTimeout = 30000//超時時間
      val poolConfig = new GenericObjectPoolConfig()
      poolConfig.setMaxIdle(100)
      poolConfig.setMaxTotal(100)
      val pool = new JedisPool(poolConfig,host,port,redisTimeout)
       sys.addShutdownHook{
          pool.close()
       }
      pool
    }
     new RedisSink(createPool)
  }
}

頁面瀏覽器代碼

<!DOCTYPE html>
<html>
    <head>
        <meta charset="utf-8">
        <title>websockt連接</title>
        <script type="text/javascript">
              var ws=null;
              //獲取連接
              function websock(){
                  //判斷瀏覽器是否支持
                  if("WebSocket" in window){
                       alert("您的瀏覽器支持 WebSocket!");
                       ws=  new WebSocket("ws://localhost:8887/webscoket");
                       //連接打開調用的方法
                       ws.onopen=function(){
                           setMessageInnerHTML("連接打開")
                       }
                       //接收到服務器的消息
                       ws.onmessage=function(event){
                           var receive_mess=event.data
                           setMessageInnerHTML("接收到服務端消息:"+receive_mess)
                       }
                       //連接關閉調用
                       ws.onclose=function(){
                           setMessageInnerHTML("連接被關閉")
                       }
                       //連接發送錯誤的時候調用
                       ws.onerror=function(){
                           setMessageInnerHTML("方式錯誤")
                       }
                       
                       
                  }
                  else{
                     alert("您的瀏覽器不支持 WebSocket!"); 
                  }
              }
              function setMessageInnerHTML(mess){
                var inner=  document.getElementById("div2").innerHTML
                document.getElementById("div2").innerHTML=inner+"<br>"+mess
              }
              function clearHtml(){
                  document.getElementById("div2").innerHTML="";
              }
              function closeWebScokte(){
                  if(ws !=null){
                        ws.close()
                  } 
              }
              //監聽窗口關閉事件,當窗口關閉時,主動去關閉websocket連接,防止連接還沒斷開就關閉窗口,server端會拋異常。 
              window.onbeforeunload = function() { 
                  closeWebScokte()
              } 
        </script>
    </head>
    <body>
         <div id="div1">
             <input type="button" onclick="websock()" value="連接websocke" />
              <input type="button" onclick="closeWebScokte()" value="關閉連接" />
              <input type="button" onclick="clearHtml()" value="清空信息" />
         </div>
         <div id="div2">
             
         </div>
    </body>
</html>

 

 

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM