先介紹一下項目需求,主要是服務器接受客服端(電子櫃台)傳來的心跳信息,服務器也能主動發送信息給客戶端
最近看了很多帖子,大多是服務器接受信息,然后被動回應客服端,這里我簡單的做了一個管理客戶的列表。用於指定發送
(當然就是本地操作成功,還未完全測試,應該還是存在很多Bug,僅供參考!!)
先說說Nio,這里就直接貼大神的鏈接啦:https://gitbook.cn/books/5b1792ad26a49a55324e782c/index.html
還有就是這里這套學校管理老師的比喻,就很形象,比較適合我這種菜鳥理解:https://www.cnblogs.com/wcyBlog/p/4716676.html
然后就是代碼了!!!!!
服務器代碼如下:
package com.js.archive.tcp;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.util.HashMap;
import java.util.Iterator;
public class TcpServer {
//緩沖區長度
private static final int BUF_SIZE = 28000;
//select 方法等待星道准備好的最長時間
private static final int timeout = 3000;
public static Selector selector;
/**
* 發送消息的開關
*/
public static boolean signal = false;
/**
* 發送消息的內容
*/
public static String message= "";
public static void main(String[] args) throws IOException {
init("192.168.0.166",1006);
new ServerWriteThread();
tcpServer();
}
public static void init(String hostname,Integer port) throws IOException {
if (selector == null) {
synchronized (TcpServer.class) {
if (selector == null) {
//創建一個選擇器
selector = Selector.open();
//實例化一個信道
ServerSocketChannel socketChannel = ServerSocketChannel.open();
//將該信道綁定到指定端口
InetSocketAddress inetSocketAddress = new InetSocketAddress(hostname, port);
socketChannel .bind(inetSocketAddress);
System.out.println("服務端套接字"+socketChannel .socket().getLocalSocketAddress());
//配置信道為非阻塞模式
socketChannel.configureBlocking(false);
//將選擇器注冊到各個信道
socketChannel.register(selector, SelectionKey.OP_ACCEPT);
}
}
}
}
public static void tcpServer() throws IOException {
//創建一個實現了協議接口的對象
TCPProtocol protocol = new EchoSelectorProtocol(BUF_SIZE);
while (true) {
//一直等待,直至到准備好了io操作
if (selector.select(timeout) == 0) {
//等待TCP receive data:
System.out.println("write...");
continue;
}
//獲取准備好的信道所關聯的Key集合的iterator實例
Iterator<SelectionKey> keyIterable = selector.selectedKeys().iterator();
//循環取得集合中的每個鍵值
while (keyIterable.hasNext()) {
SelectionKey key = keyIterable.next();
// 處理事件
if (key.isAcceptable()) {
protocol.handleAccept(key);
}
//讀取信息
if (key.isReadable()) {
protocol.handleRead(key);
}
//判斷並發送信息
if(key.isValid() && key.isWritable()){
if(signal == true){
protocol.handleWrite(key);
}
}
//這里需要手動從鍵集中移除當前的key
keyIterable.remove();
}
signal = false;
}
}
}
package com.js.archive.tcp;
import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.util.HashMap;
public interface TCPProtocol {
/**
* 接收一個SocketChannel的處理
* @param key
* @throws IOException
*/
void handleAccept(SelectionKey key) throws IOException;
/**
* 從一個SocketChannel讀取信息的處理
* @param key
* @throws IOException
*/
void handleRead(SelectionKey key) throws IOException;
/**
* 向一個SocketChannel寫入信息的處理
* @param key
* @throws IOException
*/
void handleWrite(SelectionKey key) throws IOException;
}
package com.js.archive.tcp;
import com.js.archive.tcp.messageHandler.HeartMessageHandler;
import com.js.archive.tcp.messageHandler.HumitureMessageHandler;
import com.js.archive.tcp.messageHandler.RFBoardFailure;
import com.js.archive.tcp.messageHandler.TcpMessage;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.List;
@Slf4j
public class EchoSelectorProtocol implements TCPProtocol {
private int bufSize;//緩沖區長度
public EchoSelectorProtocol(int bufSize) {
this.bufSize = bufSize;
}
/**
* 服務端信道已經准備好了接收新的客戶端連接
*/
// 處理連接事件
@Override
public void handleAccept(SelectionKey key) throws IOException {
SocketChannel channel = ((ServerSocketChannel) key.channel()).accept();
channel.configureBlocking(false);
System.out.println("客戶端套接字:"+channel.getRemoteAddress());
//將選擇器注冊到連接到的客戶端信道,並指定該信道key值的屬性為OP_READ,同時為該信道指定關聯的附件
channel.register(key.selector(), SelectionKey.OP_READ);
}
/**
* 客戶端信道已經准備好了從信道中讀取數據到緩沖區
*/
// 處理讀事件
@Override
public void handleRead(SelectionKey key) throws IOException {
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer buff = ByteBuffer.allocate(bufSize);
//如果read()方法返回-1說明客戶端關閉了鏈接,那么客戶端已經接收到了與自己發送字節數
long bytesRead=0;
try{
bytesRead = channel.read(buff);
}catch(IOException e){
key.cancel();
channel.socket().close();
channel.close();
return;
}
if (bytesRead <=0) {
channel.close();
} else if (bytesRead > 0) {
buff.flip();
byte[] array = new byte[buff.remaining()];
buff.get(array);
//todo xurunfei 連幀 $sdfasdf#$dsfdsf#處理
//n內網公網是否都可以
String dataSum = new String(array);
List<String> datalist =getMessagelist(dataSum);
for (String data:datalist
) {
if (!data.contains("$") && !data.contains("#")) {
handlerMessage(data,key);
} else {
log.error("tcp data format error {}", data);
}
}
//如果緩沖區總讀入了數據,則將該信道感興趣的炒作設置為可讀可寫
// key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
key.interestOps(SelectionKey.OP_READ| SelectionKey.OP_WRITE);
buff.compact();
}
}
/**
* 客戶端信道已經准備好了將數據從緩沖區寫入信道
*/
@Override
public void handleWrite(SelectionKey key) throws IOException {
String mss=TcpMessage.getCookieByMessage(TcpServer.message);
//解析得到傳入信息中包含的機器序列號,進行篩選,選擇指定的客戶端
if(mss.equals((String)key.attachment())){
ByteBuffer buff = ByteBuffer.allocate(bufSize);
buff.clear();
//拼接消息頭尾
buff.put(("$"+TcpServer.message+"#").getBytes());
buff.flip();
while(buff.hasRemaining()) {
SocketChannel socketChannel = (SocketChannel) key.channel();
socketChannel.write(buff);
}
buff.compact();
}else {
return;
}
}
/**
* 處理消息邏輯 解析服務端送來的信息
* @param message 消息體
*/
public void handlerMessage(String message,SelectionKey key) {
int funCode = TcpMessage.getFunCodeByMessage(message);
//解析客服端信息,得到客戶端序列號
String cookie =TcpMessage.getCookieByMessage(message);
//作為唯一標識綁定進Key,方便指定用戶發送信息。
key.attach(cookie);
// 13功能:檔案ID心跳包,每10秒檔案櫃向后台發送1個檔案ID心跳包
if (funCode == 13) {
TcpMessage tcpMessage = new HeartMessageHandler(message);
log.debug(" 心跳包 message :"+tcpMessage+" "+((HeartMessageHandler) tcpMessage).getArchivesIdList());
//14功能:溫濕度上傳
} else if (funCode == 14) {
TcpMessage tcpMessage=new HumitureMessageHandler(message);
log.debug(" 溫濕度 message :"+tcpMessage+" "+((HumitureMessageHandler) tcpMessage).getTemperatuerList());
// 15功能:存取檔案變化上傳
} else if (funCode == 15) {
log.debug("存取檔案變化" );
}
}
/**
* 得到去除消息$和#的messagelist
*/
public ArrayList getMessagelist(String data){
ArrayList messagelist = new ArrayList();
String[] strs=data.split("#");
for(int i=0,len=strs.length;i<len;i++){
messagelist.add(strs[i].substring(1,strs[i].length()));
}
return messagelist;
}
}
服務器主動發送信息!!!!!
package com.js.archive.tcp;
import java.util.Scanner;
public class ServerWriteThread implements Runnable {
/**
* 服務器端寫線程
*/
public ServerWriteThread(){
new Thread(this).start();
}
public void run(){
Scanner s= new Scanner(System.in);
while(true){
System.out.println("輸入:");
TcpServer.message = s.next();
TcpServer.signal = true;
}
}
}
這里只是利用key的附件功能,簡單的實現了客戶端信息的保存,簡單實現了指定客戶端發送消息!
當然也可以選擇使用一個Map將key和客戶端id進行綁定
以上僅是本人作為菜鳥的一點小心得。。僅供參考!
