自己想了一下怎么實現,就寫了,沒有深究是否合理.更多處理沒有寫下去,例如收件人不在線,應該保存在數據庫,等下一次連接的時候刷新map,再把數據發送過去,圖片發送也沒有做,也沒有用json格式
socket很奇怪,我用客戶端連接上了服務器,沒有發送消息的情況下,斷開電腦網絡,是不會出現問題,然后在把電腦網絡連接上,通訊依然正常,正常斷開也不出問題,但是用idea直接按stop鍵,那么服務端就會出問題了,讀取事件會一直為true,造成死循環,消耗CPU,所以必須要判斷一下客戶端連接是否斷開了
只需要把客戶端代碼啟動幾個,修改一些userName以及收件人,就可以測試,實現類似QQ微信即時通訊,聊天功能
服務端代碼
package serversocketchannel;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
/**
*
* @author ZhenWeiLai
*
*/
public class ServerSocketChannelNonBlocking {
private static ServerSocketChannel serverSocketChannel = null;
private static Charset charset = Charset.forName("GBK");//設置編碼集,用於編碼,解碼
private static Selector selector = null;
//保存客戶端的map
private static final ConcurrentHashMap<String,SocketChannel> clientSockets = new ConcurrentHashMap<>();
static{
try {
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().setReuseAddress(true);
serverSocketChannel.socket().bind(new InetSocketAddress(8000));
serverSocketChannel.configureBlocking(false);//設置為非阻塞
selector = Selector.open();//實例化一個選擇器
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
service();
}
private static void service(){
SocketChannel clientChannel = null;
SelectionKey selectionKey = null;
SocketChannel targetChannel = null;
try {
serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);//服務端監聽連接
while(true){
selector.select();//阻塞至有新的連接就開始處理
Iterator<SelectionKey> selectionKeys = selector.selectedKeys().iterator();
while(selectionKeys.hasNext()){
selectionKey = selectionKeys.next();
if(selectionKey.isAcceptable()){//如果事件是連接事件
ServerSocketChannel serverChannel = (ServerSocketChannel)selectionKey.channel();//獲取事件綁定的channel
clientChannel = serverChannel.accept();//連接獲取帶客戶端信息的socketChannel
clientChannel.configureBlocking(false);//客戶設置為非阻塞,因為非阻塞才支持選擇器.避免盲等浪費資源
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);//作為每一個客戶端的附件緩沖器
/**
* 只監聽讀事件,這里千萬別監聽寫事件,因為只要連接有效,那么寫事件會一直為true,導致死循環,很耗資源
* 可以跟serverSocket用同一個選擇器,因為綁定的channel不同
*/
clientChannel.register(selector,SelectionKey.OP_READ,byteBuffer);
}else if(selectionKey.isReadable()){//只要有客戶端寫入,那么就可以處理
//獲取客戶端附件,也就是寫入的數據
ByteBuffer byteBuffer = (ByteBuffer)selectionKey.attachment();
//從selectionKey獲取客戶端的channel
SocketChannel socketChannel = (SocketChannel)selectionKey.channel();
//把附件讀出,解碼為字符串
String msg = read(socketChannel,byteBuffer);
//這里用了->分割收件人,->后面跟着的字符串是收件人
if(msg.indexOf("->")!=-1){
//內容
String content = msg.substring(0,msg.lastIndexOf("->"));
//從map里獲取收件人的socket
targetChannel = clientSockets.get(msg.substring(msg.lastIndexOf("->")+2));
//實例化一個緩沖區,用來寫出到收件人的socketChannel
ByteBuffer temp = ByteBuffer.allocate(1024);
temp.put(charset.encode(content));
//寫出
handleWrite(targetChannel,temp);
}else{
//如果內容沒有收件人,那么視為第一次連接,客戶端發過來的userName,作為KEY存入MAP
clientSockets.put(msg,socketChannel);
}
}
selectionKeys.remove();
}
}
} catch (IOException e) {
try {
if(selectionKey!=null)selectionKey.cancel();
if(clientChannel!=null){
clientChannel.shutdownInput();
clientChannel.shutdownOutput();
clientChannel.close();
}
if(targetChannel!=null){
targetChannel.shutdownInput();
targetChannel.shutdownOutput();
targetChannel.close();
}
} catch (IOException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
e.printStackTrace();
}
}
private static String read(SocketChannel socketChannel,ByteBuffer byteBuffer){
//重置position limit為寫入做准備
byteBuffer.clear();
try {
int flag =socketChannel.read(byteBuffer);
//判斷客戶端是否斷開連接
if(flag==-1){
//如果客戶端無故斷開,一定要關閉,否則讀取事件一直為true造成死循環,非常耗資源
socketChannel.close();
}
} catch (IOException e) {
try {
socketChannel.close();
} catch (IOException e1) {
e1.printStackTrace();
}
e.printStackTrace();
}
//position =0 limit等於有效下標,為寫出做准備
byteBuffer.flip();
return charset.decode(byteBuffer).toString();
}
//寫出
private static void handleWrite(SocketChannel socketChannel,ByteBuffer byteBuffer){
synchronized (byteBuffer) {
byteBuffer.flip();
try {
socketChannel.write(byteBuffer);
} catch (IOException e) {
try {
socketChannel.close();
} catch (IOException e1) {
e1.printStackTrace();
}
e.printStackTrace();
}
}
}
}
客戶端代碼
package socketchannel;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
/**
* Created by lzw on 17-2-28.
*/
public class SocketChannelNonBlockingClient {
private static Charset charset = Charset.forName("GBK");
private static ByteBuffer receiveBuffer = ByteBuffer.allocate(10240);
private static ByteBuffer sendBuffer = ByteBuffer.allocate(10240);
private static SocketChannel socketChannel = null;
private static Selector selector = null;
private static String userName = "client1";//客戶端名
private static String targetName = "client2";//收件人名
public static void main(String[] args) {
try {
socketChannel = SocketChannel.open();
//連接到服務端
SocketAddress socketAddress = new InetSocketAddress("19.95.103.112",8000);
selector = Selector.open();//實例化一個選擇器
socketChannel.configureBlocking(false);//設置為非阻塞
//先監聽一個連接事件
socketChannel.register(selector,SelectionKey.OP_CONNECT);
//連接
socketChannel.connect(socketAddress);
//jdk 1.8的lambda表達式,用一個線程監控控制台輸入
new Thread(()->{
try {
receiveFromUser();
} catch (IOException e) {
e.printStackTrace();
}
}).start();
talk();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
private static void talk(){
try {
while(true){
selector.select();//阻塞直到連接事件
Iterator<SelectionKey> readyKeys = selector.selectedKeys().iterator();
while(readyKeys.hasNext()){
SelectionKey key =readyKeys.next();
if(key.isConnectable()){
//非阻塞的情況下可能沒有連接完成,這里調用finishConnect阻塞至連接完成
socketChannel.finishConnect();
//連接完成以后,先發送自己的userName以便保存在服務端的客戶端map里面
synchronized (sendBuffer){
SocketChannel socketChannel1 = (SocketChannel)key.channel();
sendBuffer.clear();
sendBuffer.put(charset.encode(userName));
send(socketChannel1);
socketChannel.register(selector,SelectionKey.OP_READ);//僅監聽一個讀取事件
}
}else if(key.isReadable()){
//處理讀事件
receive(key);
}
readyKeys.remove();
}
}
} catch (ClosedChannelException e) {
try {
socketChannel.close();
} catch (IOException e1) {
e1.printStackTrace();
}
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
/**
* 從控制台獲取用戶輸入
* @throws IOException
*/
private static void receiveFromUser() throws IOException{
//阻塞直到控制台有輸入
BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
for(String msg = br.readLine();msg!=null&&!msg.equals("bye");msg = br.readLine()){
//同步鎖避免線程競爭
synchronized (sendBuffer) {
sendBuffer.clear();
//編碼
sendBuffer.put(charset.encode(msg));
//分割副
sendBuffer.put(charset.encode("->"));
//目標名
sendBuffer.put(charset.encode(targetName));
send(socketChannel);
}
}
}
/**
* 接收服務端的數據
* @param key
*/
private static void receive(SelectionKey key) throws IOException {
//獲取服務端的channel
SocketChannel channel = (SocketChannel) key.channel();
//為寫入緩沖器做准備position=0,limit=capacity
receiveBuffer.clear();
//從服務端的channel把數據讀入緩沖器
channel.read(receiveBuffer);
//position=0,limit=有效下標最后一位
receiveBuffer.flip();
//解碼
String msg = charset.decode(receiveBuffer).toString();
//輸出到控制台
System.out.println(msg);
}
/**
* 發送到服務端
*/
private static void send(SocketChannel sendChannel) throws IOException {
if(sendBuffer.remaining()!=0){
synchronized (sendBuffer){
sendBuffer.flip();
sendChannel.write(sendBuffer);
}
}
}
}

