应用背景需求:
目前通过SparkStreaming实时读取到了Kafka的汽车的实时的位置数据,将其保证在Redis缓存中,现在需要每隔5秒,将redis的实时的最新汽车的位置信息,通过websocket 将信息推送到页面浏览器,以便动态在地图上显示汽车的位置信息。
redis保存的数据格式用的是普通的kv
key是汽车的终端编号,value 是JOSN的字符串,要素包括,采集时间,汽车编号,经度,维度,车速,油耗,朝向等信息
应用websocket知识
原理参考:https://www.cnblogs.com/qq99514925/p/12258036.html
代码实现:
服务端代码:
需要编写一个websocket服务端代码:如下
import org.java_websocket.WebSocket; import org.java_websocket.handshake.ClientHandshake; import org.java_websocket.server.WebSocketServer; import java.net.InetSocketAddress; import java.net.UnknownHostException; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; /** * WebSock服务端代码 */ public class MyWebSockServer extends WebSocketServer{ private AtomicInteger couts=new AtomicInteger(0);//统计连接数 private Map<WebSocket,ClientInfo> clients= new ConcurrentHashMap<WebSocket,ClientInfo>();//客户端连接集合 public MyWebSockServer() throws UnknownHostException { } public MyWebSockServer(int port) throws UnknownHostException { super(new InetSocketAddress(port)); System.out.println("websocket Server start at port:"+port); } //连接被打开的时候执行 public void onOpen(WebSocket conn, ClientHandshake handshake) { System.out.println("---创建新连接-------"+handshake.getResourceDescriptor()); System.out.print("hostname:"+conn.getRemoteSocketAddress().getHostName()); System.out.print("address:"+conn.getRemoteSocketAddress().getAddress()); System.out.print("port:"+conn.getRemoteSocketAddress().getPort()); ClientInfo cinfo = new ClientInfo(); cinfo.setCliURI(handshake.getResourceDescriptor()); cinfo.setClinetIP(conn.getRemoteSocketAddress().getHostName()); if(!clients.containsKey(conn)){ clients.put(conn,cinfo); int currcount = couts.incrementAndGet(); System.out.println("当前连接数"+currcount); } } //连接被关闭的时候被调用 public void onClose(WebSocket conn, int code, String reason, boolean remote) { System.out.println("---连接被关闭------------"); System.out.println(" coed:"+code+" reason:"+reason+" remote:"+remote); //清除连接 if(clients.containsKey(conn)){ clients.remove(conn); System.out.println("conn:"+conn+" is closed"); int currcount = couts.decrementAndGet(); System.out.println("当前连接:"+currcount); } } //收到客户端消息的时候调用 public void onMessage(WebSocket conn, String message) { System.out.println("--收到客户端发过来的消息---"+message); } //连接报错的时候调用 public void onError(WebSocket conn, Exception ex) { } //启动时候调用 public void onStart() { //启动执行的方法 System.out.println("--启动服务端------"); } //向所有的终端发送消息 public void sendMessToAll(String message){ System.out.println("准备发送消息:"+message); for(Map.Entry<WebSocket,ClientInfo> entry:clients.entrySet()){ entry.getKey().send(message); } } public static void main(String[] args) throws UnknownHostException { MyWebSockServer server = new MyWebSockServer(8887); server.start(); //启动Redis监听线程,接受Redis消息 Thread thread = new Thread(new RedisMessConsumer(server)); thread.start(); } }
读取redis的线程代码如下
import com.dtinone.util.ProperitesUtile; import com.dtinone.util.redis.RedisSink; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPubSub; import java.util.Properties; import java.util.Set; /** * Redis消息订阅,通过websocket进行推送到前端 * */ public class RedisMessConsumer implements Runnable{ public static final String CHANNEL_KEY = "channe:carinfo";//频道 private MyWebSockServer server; public RedisMessConsumer(MyWebSockServer server) { this.server = server; //创建Jedis连接 } public void run() { // 接受Redis的消息并消费 Properties prop = ProperitesUtile.getProp("redis.properties"); RedisSink redisSink = RedisSink.apply(prop); JedisPool jedisPool = redisSink.getJedisPool(); Jedis jedis = jedisPool.getResource(); //线程执行到这里将被阻塞,一直接收,知道调用unsubstri /*jedis.subscribe(new MyJedisPubSub(),CHANNEL_KEY); System.out.println("订阅结束");*/ while(true){ try { consumerAllkeys(jedis);//每隔5秒广播车辆信息到前台客户端 } catch (InterruptedException e) { e.printStackTrace(); } } } //查询所有的key的信息发送 public void consumerAllkeys(Jedis jedis) throws InterruptedException { Thread.sleep(5000);//每隔5秒发送一次全部信息 jedis.select(2); Set<String> keys = jedis.keys("*"); for(String key:keys){ String value = jedis.get(key);//取出消费信息 server.sendMessToAll(value); } } class MyJedisPubSub extends JedisPubSub{ @Override public void onMessage(String channel, String message) { System.out.println(Thread.currentThread().getName()+"-接收到消息:channel=" + channel + ",message=" + message); //super.onMessage(channel, message); if("quite".equals(message)){ unsubscribe(CHANNEL_KEY);//取消订阅 } else { server.sendMessToAll(message); } } @Override public void unsubscribe(String... channels) { super.unsubscribe(channels); } } }
RedisSink的Scala代码如下:获取redis连接池子
package com.dtinone.util.redis import java.util.Properties import org.apache.commons.pool2.impl.GenericObjectPoolConfig import redis.clients.jedis.JedisPool class RedisSink(createPool:()=>JedisPool) extends Serializable { private lazy val jedispool:JedisPool=createPool() def getJedisPool(): JedisPool ={ jedispool } } object RedisSink{ //伴生对象 def apply(conf:Properties): RedisSink = { val createPool=()=>{ /*val host = conf.getProperty("redis.host").toString val port = conf.getProperty("redis.port'").toString.toInt*/ val host="dtinone20" val port=6379 val redisTimeout = 30000//超时时间 val poolConfig = new GenericObjectPoolConfig() poolConfig.setMaxIdle(100) poolConfig.setMaxTotal(100) val pool = new JedisPool(poolConfig,host,port,redisTimeout) sys.addShutdownHook{ pool.close() } pool } new RedisSink(createPool) } }
页面浏览器代码
<!DOCTYPE html> <html> <head> <meta charset="utf-8"> <title>websockt连接</title> <script type="text/javascript"> var ws=null; //获取连接 function websock(){ //判断浏览器是否支持 if("WebSocket" in window){ alert("您的浏览器支持 WebSocket!"); ws= new WebSocket("ws://localhost:8887/webscoket"); //连接打开调用的方法 ws.onopen=function(){ setMessageInnerHTML("连接打开") } //接收到服务器的消息 ws.onmessage=function(event){ var receive_mess=event.data setMessageInnerHTML("接收到服务端消息:"+receive_mess) } //连接关闭调用 ws.onclose=function(){ setMessageInnerHTML("连接被关闭") } //连接发送错误的时候调用 ws.onerror=function(){ setMessageInnerHTML("方式错误") } } else{ alert("您的浏览器不支持 WebSocket!"); } } function setMessageInnerHTML(mess){ var inner= document.getElementById("div2").innerHTML document.getElementById("div2").innerHTML=inner+"<br>"+mess } function clearHtml(){ document.getElementById("div2").innerHTML=""; } function closeWebScokte(){ if(ws !=null){ ws.close() } } //监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常。 window.onbeforeunload = function() { closeWebScokte() } </script> </head> <body> <div id="div1"> <input type="button" onclick="websock()" value="连接websocke" /> <input type="button" onclick="closeWebScokte()" value="关闭连接" /> <input type="button" onclick="clearHtml()" value="清空信息" /> </div> <div id="div2"> </div> </body> </html>