外網服務端保存內網服務端會話的有效性以及平台上監控所有內網服務端的網絡狀況,模仿心跳機制實現,這里在做一點敘訴,關於思路和具體實現。
在很多的平台應用中,都有這樣的需求,平台內包括多個子系統或者屬於其管控范圍內的其他平台,需要對這些系統進行統一的監控,來查看當前的運行狀態或者其他運行信息,我們的應用也有這樣的一個情況,需要再外網服務端(平台)上監控,其下運行的多個內網服務端的網絡狀況,查閱了寫資料后確立了2種可實現的方式。
1:輪詢機制
輪詢:概括來說是服務端定時主動的去與要監控狀態的客戶端(或者叫其他系統)通信,詢問當前的某種狀態,客戶端返回狀態信息,客戶端沒有返回或返回錯誤、失效信息、則認為客戶端已經宕機,然后服務端自己內部把這個客戶端的狀態保存下來(宕機或者其他),如果客戶端正常,那么返回正常狀態,如果客戶端宕機或者返回的是定義的失效狀態那么當前的客戶端狀態是能夠及時的監控到的,如果客戶端宕機之后重啟了那么當服務端定時來輪詢的時候,還是可以正常的獲取返回信息,把其狀態重新更新。
在很多的平台應用中,都有這樣的需求,平台內包括多個子系統或者屬於其管控范圍內的其他平台,需要對這些系統進行統一的監控,來查看當前的運行狀態或者其他運行信息,我們的應用也有這樣的一個情況,需要再外網服務端(平台)上監控,其下運行的多個內網服務端的網絡狀況,查閱了寫資料后確立了2種可實現的方式。
1:輪詢機制
2:心跳機制
輪詢:概括來說是服務端定時主動的去與要監控狀態的客戶端(或者叫其他系統)通信,詢問當前的某種狀態,客戶端返回狀態信息,客戶端沒有返回或返回錯誤、失效信息、則認為客戶端已經宕機,然后服務端自己內部把這個客戶端的狀態保存下來(宕機或者其他),如果客戶端正常,那么返回正常狀態,如果客戶端宕機或者返回的是定義的失效狀態那么當前的客戶端狀態是能夠及時的監控到的,如果客戶端宕機之后重啟了那么當服務端定時來輪詢的時候,還是可以正常的獲取返回信息,把其狀態重新更新。
心跳:最終得到的結果是與輪詢一樣的但是實現的方式有差別,心跳不是服務端主動去發信息檢測客戶端狀態,而是在服務端保存下來所有客戶端的狀態信息,然后等待客戶端定時來訪問服務端,更新自己的當前狀態,如果客戶端超過指定的時間沒有來更新狀態,則認為客戶端已經宕機或者其狀態異常。
心跳機制與輪詢的比較,在我們的應用中,采用的是心跳,這樣一是避免服務端的壓力,二是靈活好控制,上一篇文章中提到過,我們的外網服務端(服務端)不知道內網服務端(客戶端)的地址,有雖然有保存客戶端的socket會話,但是客戶端宕機會話就失效了。所以只能等着他主動來報告狀態。
在來說一下實現方式,這個很簡單,就是一個思路問題。
首先,客戶端(內網服務端)啟動后,帶着自己的標識符與服務端建立socket連接,服務端緩存下來對應信息(上一篇文章中已經實現過了),然后在通過socket流,定時發送當前信息消息到服務端(外網服務器端)某個接口,服務端收到后更新當前的客戶端的狀態,比如(會話地址,標識符,網絡的活躍狀態,連接時間,心跳時間),本次來更新的時間就是心跳時間,然后服務端還有一個定時器,定時檢查所有緩存的客戶端會話集合,將其中心跳時間與當前時間進行對比,如果超過指定的時間還沒有來更新則認為該客戶端的網絡出現異常或者宕機,然后更新該客戶端的網絡狀態。- import java.io.BufferedReader;
- import java.io.IOException;
- import java.io.InputStream;
- import java.io.InputStreamReader;
- import java.net.InetSocketAddress;
- import java.net.ServerSocket;
- import java.net.Socket;
- import java.util.Date;
- import java.util.HashMap;
- import java.util.Map;
- import org.dom4j.Document;
- import org.dom4j.DocumentException;
- import org.dom4j.DocumentHelper;
- import cn.edu.zju.cst.mina.im.server.entity.User;
- import cn.edu.zju.cst.mina.im.server.handler.ServerControler;
- public class UserStateManage extends Thread {
- //在線用戶狀態列表
- static HashMap<Integer, UserState> userStateList = new HashMap<Integer, UserState>();
- Object hashLock = new Object();
- //當前的連接數和工作線程數
- static int workThreadNum = 0;
- static int socketConnect = 0;
- private ServerSocket serverSocket;
- //服務器IP
- private String host = "10.82.81.79";
- //服務器端口
- private int stateReportPort = 60001;
- //設置心跳包的結束標記
- String endFlag = "</protocol>";
- CharSequence csEndFlag = endFlag.subSequence(0, 10);
- //掃描間隔
- private int scanTime = 1800;
- @Override
- public void run() {
- //綁定端口,並開始偵聽用戶的心跳包
- serverSocket = startListenUserReport(stateReportPort);
- if(serverSocket == null){
- System.out.println("【創建ServerSocket失敗!】");
- return;
- }
- //啟動掃描線程
- Thread scanThread = new Thread(new scan());
- scanThread.start();
- //等待用戶心跳包請求
- while(true){
- Socket socket = null;
- try {
- socketConnect = socketConnect + 1;
- //接收客戶端的連接
- socket = serverSocket.accept();
- //為該連接創建一個工作線程
- Thread workThread = new Thread(new Handler(socket));
- //啟動工作線程
- workThread.start();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
- /**
- * 創建一個ServerSocket來偵聽用戶心跳包請求
- * @param port 指定的服務器端的端口
- * @return 返回ServerSocket
- * @author dream
- */
- public ServerSocket startListenUserReport(int port){
- try {
- ServerSocket serverSocket = new ServerSocket();
- if(!serverSocket.getReuseAddress()){
- serverSocket.setReuseAddress(true);
- }
- serverSocket.bind(new InetSocketAddress(host,port));
- System.out.println("【開始在"+serverSocket.getLocalSocketAddress()+"上偵聽用戶的心跳包請求!】");
- return serverSocket;
- } catch (IOException e) {
- System.out.println("【端口"+port+"已經被占用!】");
- if (serverSocket != null) {
- if (!serverSocket.isClosed()) {
- try {
- serverSocket.close();
- } catch (IOException e1) {
- e1.printStackTrace();
- }
- }
- }
- }
- return serverSocket;
- }
- //工作線程類
- class Handler implements Runnable{
- private Socket socket;
- UserState us = null;
- User newUser = null;
- private int userId;
- private int userState;
- /**
- * 構造函數,從調用者那里取得socket
- * @param socket 指定的socket
- * @author dream
- */
- public Handler(Socket socket){
- this.socket = socket;
- }
- /**
- * 從指定的socket中得到輸入流
- * @param socket 指定的socket
- * @return 返回BufferedReader
- * @author dream
- */
- private BufferedReader getReader(Socket socket){
- InputStream is = null;
- BufferedReader br = null;
- try {
- is = socket.getInputStream();
- br = new BufferedReader(new InputStreamReader(is));
- } catch (IOException e) {
- e.printStackTrace();
- }
- return br;
- }
- public void run() {
- try{
- workThreadNum = workThreadNum +1;
- System.out.println("【第"+workThreadNum+"個的連接:"+socket.getInetAddress()+":"+socket.getPort()+"】");
- BufferedReader br = getReader(socket);
- String meg = null;
- StringBuffer report = new StringBuffer();
- while ((meg = br.readLine()) != null) {
- report.append(meg);
- if (meg.contains(csEndFlag)) {
- us = getReporterUserState(meg, socket);
- synchronized (hashLock) {
- userStateList.put(userId, us);
- }
- }
- }
- }catch(IOException e){
- System.out.println("【客戶:"+newUser.getUser_id()+"已經斷開連接!】");
- userStateList.remove( userId );
- announceStateChange( userId , -1);
- }finally{
- if(socket != null){
- try {
- //斷開連接
- socket.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
- }
- private UserState getReporterUserState(String meg , Socket socket){
- UserState us = new UserState();
- try {
- Document requestDoc = DocumentHelper.parseText(meg);
- newUser = ServerControler.parseXmlToUserState(requestDoc,socket);
- userId = newUser.getUser_id();
- userState = newUser.getUser_state();
- us.setFlag(2);
- us.setUser_state( userState );
- us.setUser_id( userId );
- us.setUser_ip(newUser.getUser_ip());
- us.setUser_port(newUser.getUser_port());
- } catch (DocumentException e) {
- System.out.println("【來自客戶端的信息不是一個合法的心跳包協議】");
- }
- return us;
- }
- }
- //掃描線程
- class scan implements Runnable{
- public void run() {
- while (true) {
- System.out.println("*******"+new Date()+":掃描線程開始掃描"+"*******");
- synchronized (hashLock) {
- if(!userStateList.isEmpty()){
- //遍歷在線用戶列表
- for (Map.Entry<Integer, UserState> entry : userStateList.entrySet()) {
- int flag = entry.getValue().getFlag();
- if ( (flag - 1) < 0) {
- //在這里通知該用戶的好友其狀態發生改變
- // announceStateChange(entry.getKey() , 0);
- }else{
- entry.getValue().setFlag(flag - 1);
- userStateList.put(entry.getKey(), entry.getValue());
- }
- System.out.println(entry.getKey() + "-->" + entry.getValue().toString());
- }
- }else{
- System.out.println("現在還沒有在線用戶!");
- }
- }
- //實現定時掃描
- try {
- sleep(scanTime);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }
- private void announceStateChange(int userId , int state){
- System.out.println("通知其好友!");
- }
- /**
- * 查詢一個用戶是否在線
- * @param userId 指定要查詢狀態的用戶的ID
- * @return true 在線; false 不在線;
- * @author dream
- */
- public boolean isAlive(int userId){
- synchronized (hashLock) {
- return userStateList.containsKey(userId);
- }
- }
- /**
- * 返回指定用戶ID的狀態
- * @param userId 指定要查詢狀態的用戶的ID
- * @return >0 該用戶在線; -1 該用戶離線
- * @author dream
- */
- public int getUserState(int userId){
- synchronized (hashLock) {
- if(userStateList.containsKey(userId)){
- return userStateList.get(userId).getUser_state();
- }else{
- return -1;
- }
- }
- }
- public Object getHashLock() {
- return hashLock;
- }
- public void setHashLock(Object hashLock) {
- this.hashLock = hashLock;
- }
- public String getHost() {
- return host;
- }
- public void setHost(String host) {
- this.host = host;
- }
- public int getStateReportPort() {
- return stateReportPort;
- }
- public void setStateReportPort(int stateReportPort) {
- this.stateReportPort = stateReportPort;
- }
- public String getEndFlag() {
- return endFlag;
- }
- public void setEndFlag(String endFlag) {
- this.endFlag = endFlag;
- }
- public int getScanTime() {
- return scanTime;
- }
- public void setScanTime(int scanTime) {
- this.scanTime = scanTime;
- }
- public static HashMap<Integer, UserState> getUserStateList() {
- return userStateList;
- }
- public static int getWorkThreadNum() {
- return workThreadNum;
- }
- public static int getSocketConnect() {
- return socketConnect;
- }
- //測試本函數的main函數
- public static void main(String arg[]){
- UserStateManage usm = new UserStateManage();
- usm.start();
- }
- }