應用背景需求:
目前通過SparkStreaming實時讀取到了Kafka的汽車的實時的位置數據,將其保證在Redis緩存中,現在需要每隔5秒,將redis的實時的最新汽車的位置信息,通過websocket 將信息推送到頁面瀏覽器,以便動態在地圖上顯示汽車的位置信息。
redis保存的數據格式用的是普通的kv
key是汽車的終端編號,value 是JOSN的字符串,要素包括,采集時間,汽車編號,經度,維度,車速,油耗,朝向等信息
應用websocket知識
原理參考:https://www.cnblogs.com/qq99514925/p/12258036.html
代碼實現:
服務端代碼:
需要編寫一個websocket服務端代碼:如下
import org.java_websocket.WebSocket; import org.java_websocket.handshake.ClientHandshake; import org.java_websocket.server.WebSocketServer; import java.net.InetSocketAddress; import java.net.UnknownHostException; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; /** * WebSock服務端代碼 */ public class MyWebSockServer extends WebSocketServer{ private AtomicInteger couts=new AtomicInteger(0);//統計連接數 private Map<WebSocket,ClientInfo> clients= new ConcurrentHashMap<WebSocket,ClientInfo>();//客戶端連接集合 public MyWebSockServer() throws UnknownHostException { } public MyWebSockServer(int port) throws UnknownHostException { super(new InetSocketAddress(port)); System.out.println("websocket Server start at port:"+port); } //連接被打開的時候執行 public void onOpen(WebSocket conn, ClientHandshake handshake) { System.out.println("---創建新連接-------"+handshake.getResourceDescriptor()); System.out.print("hostname:"+conn.getRemoteSocketAddress().getHostName()); System.out.print("address:"+conn.getRemoteSocketAddress().getAddress()); System.out.print("port:"+conn.getRemoteSocketAddress().getPort()); ClientInfo cinfo = new ClientInfo(); cinfo.setCliURI(handshake.getResourceDescriptor()); cinfo.setClinetIP(conn.getRemoteSocketAddress().getHostName()); if(!clients.containsKey(conn)){ clients.put(conn,cinfo); int currcount = couts.incrementAndGet(); System.out.println("當前連接數"+currcount); } } //連接被關閉的時候被調用 public void onClose(WebSocket conn, int code, String reason, boolean remote) { System.out.println("---連接被關閉------------"); System.out.println(" coed:"+code+" reason:"+reason+" remote:"+remote); //清除連接 if(clients.containsKey(conn)){ clients.remove(conn); System.out.println("conn:"+conn+" is closed"); int currcount = couts.decrementAndGet(); System.out.println("當前連接:"+currcount); } } //收到客戶端消息的時候調用 public void onMessage(WebSocket conn, String message) { System.out.println("--收到客戶端發過來的消息---"+message); } //連接報錯的時候調用 public void onError(WebSocket conn, Exception ex) { } //啟動時候調用 public void onStart() { //啟動執行的方法 System.out.println("--啟動服務端------"); } //向所有的終端發送消息 public void sendMessToAll(String message){ System.out.println("准備發送消息:"+message); for(Map.Entry<WebSocket,ClientInfo> entry:clients.entrySet()){ entry.getKey().send(message); } } public static void main(String[] args) throws UnknownHostException { MyWebSockServer server = new MyWebSockServer(8887); server.start(); //啟動Redis監聽線程,接受Redis消息 Thread thread = new Thread(new RedisMessConsumer(server)); thread.start(); } }
讀取redis的線程代碼如下
import com.dtinone.util.ProperitesUtile; import com.dtinone.util.redis.RedisSink; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPubSub; import java.util.Properties; import java.util.Set; /** * Redis消息訂閱,通過websocket進行推送到前端 * */ public class RedisMessConsumer implements Runnable{ public static final String CHANNEL_KEY = "channe:carinfo";//頻道 private MyWebSockServer server; public RedisMessConsumer(MyWebSockServer server) { this.server = server; //創建Jedis連接 } public void run() { // 接受Redis的消息並消費 Properties prop = ProperitesUtile.getProp("redis.properties"); RedisSink redisSink = RedisSink.apply(prop); JedisPool jedisPool = redisSink.getJedisPool(); Jedis jedis = jedisPool.getResource(); //線程執行到這里將被阻塞,一直接收,知道調用unsubstri /*jedis.subscribe(new MyJedisPubSub(),CHANNEL_KEY); System.out.println("訂閱結束");*/ while(true){ try { consumerAllkeys(jedis);//每隔5秒廣播車輛信息到前台客戶端 } catch (InterruptedException e) { e.printStackTrace(); } } } //查詢所有的key的信息發送 public void consumerAllkeys(Jedis jedis) throws InterruptedException { Thread.sleep(5000);//每隔5秒發送一次全部信息 jedis.select(2); Set<String> keys = jedis.keys("*"); for(String key:keys){ String value = jedis.get(key);//取出消費信息 server.sendMessToAll(value); } } class MyJedisPubSub extends JedisPubSub{ @Override public void onMessage(String channel, String message) { System.out.println(Thread.currentThread().getName()+"-接收到消息:channel=" + channel + ",message=" + message); //super.onMessage(channel, message); if("quite".equals(message)){ unsubscribe(CHANNEL_KEY);//取消訂閱 } else { server.sendMessToAll(message); } } @Override public void unsubscribe(String... channels) { super.unsubscribe(channels); } } }
RedisSink的Scala代碼如下:獲取redis連接池子
package com.dtinone.util.redis import java.util.Properties import org.apache.commons.pool2.impl.GenericObjectPoolConfig import redis.clients.jedis.JedisPool class RedisSink(createPool:()=>JedisPool) extends Serializable { private lazy val jedispool:JedisPool=createPool() def getJedisPool(): JedisPool ={ jedispool } } object RedisSink{ //伴生對象 def apply(conf:Properties): RedisSink = { val createPool=()=>{ /*val host = conf.getProperty("redis.host").toString val port = conf.getProperty("redis.port'").toString.toInt*/ val host="dtinone20" val port=6379 val redisTimeout = 30000//超時時間 val poolConfig = new GenericObjectPoolConfig() poolConfig.setMaxIdle(100) poolConfig.setMaxTotal(100) val pool = new JedisPool(poolConfig,host,port,redisTimeout) sys.addShutdownHook{ pool.close() } pool } new RedisSink(createPool) } }
頁面瀏覽器代碼
<!DOCTYPE html> <html> <head> <meta charset="utf-8"> <title>websockt連接</title> <script type="text/javascript"> var ws=null; //獲取連接 function websock(){ //判斷瀏覽器是否支持 if("WebSocket" in window){ alert("您的瀏覽器支持 WebSocket!"); ws= new WebSocket("ws://localhost:8887/webscoket"); //連接打開調用的方法 ws.onopen=function(){ setMessageInnerHTML("連接打開") } //接收到服務器的消息 ws.onmessage=function(event){ var receive_mess=event.data setMessageInnerHTML("接收到服務端消息:"+receive_mess) } //連接關閉調用 ws.onclose=function(){ setMessageInnerHTML("連接被關閉") } //連接發送錯誤的時候調用 ws.onerror=function(){ setMessageInnerHTML("方式錯誤") } } else{ alert("您的瀏覽器不支持 WebSocket!"); } } function setMessageInnerHTML(mess){ var inner= document.getElementById("div2").innerHTML document.getElementById("div2").innerHTML=inner+"<br>"+mess } function clearHtml(){ document.getElementById("div2").innerHTML=""; } function closeWebScokte(){ if(ws !=null){ ws.close() } } //監聽窗口關閉事件,當窗口關閉時,主動去關閉websocket連接,防止連接還沒斷開就關閉窗口,server端會拋異常。 window.onbeforeunload = function() { closeWebScokte() } </script> </head> <body> <div id="div1"> <input type="button" onclick="websock()" value="連接websocke" /> <input type="button" onclick="closeWebScokte()" value="關閉連接" /> <input type="button" onclick="clearHtml()" value="清空信息" /> </div> <div id="div2"> </div> </body> </html>