首先請確保已經學習了Java NIO的基礎知識,包括Buffer,Channel文件通道和Socket通道,Selector。關於NIO比起I/O的好處,區別等這里就不說了。具體可以參考后面的參考鏈接等。
這篇博客主要以一個使用NIO傳輸文件的例子來學習NIO中網絡的基本操作
傳統的監控socket方式存在的問題
傳統的監控多個socket的Java解決方案是為每個socket創建一個線程並使得線程可以在read調用時阻塞,直到數據可用。實際上這種方案有很大的弊端就是當建立很多鏈接需要創建很多線程,這些線程的創建管理要耗費很大的資源,也許在這個連接上只發送少量的數據,但是CPU切換也要耗費好多資源。於是為了減少系統線程的開銷,采用線程池的辦法來減少線程創建和回收的成本,但是有一些使用場景仍然是無法解決的,如果建立的都是長連接,事實上它們並不是每時每刻都在傳輸數據,這時不可能創建那么多的連接。
使用NIO就可以有效的解決這個問題,利用NIO提供的選擇器Selector,可以使用一個線程管理多個連接,這實際上是一種I/O復用。
NIO傳輸文件例子
下面使用NIO做了一個向服務器端上傳文件的例子
服務器端代碼
public class Server {
private ByteBuffer buffer = ByteBuffer.allocate(1024*1024);
//使用Map保存每個連接,當OP_READ就緒時,根據key找到對應的文件對其進行寫入。若將其封裝成一個類,作為值保存,可以再上傳過程中顯示進度等等
Map<SelectionKey, FileChannel> fileMap = new HashMap<SelectionKey, FileChannel>();
public static void main(String[] args) throws IOException{
Server server = new Server();
server.startServer();
}
public void startServer() throws IOException{
Selector selector = Selector.open();
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
serverChannel.bind(new InetSocketAddress(8888));
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("服務器已開啟...");
while (true) {
int num = selector.select();
if (num == 0) continue;
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
while (it.hasNext()) {
SelectionKey key = it.next();
if (key.isAcceptable()) {
ServerSocketChannel serverChannel1 = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = serverChannel1.accept();
if (socketChannel == null) continue;
socketChannel.configureBlocking(false);
SelectionKey key1 = socketChannel.register(selector, SelectionKey.OP_READ);
InetSocketAddress remoteAddress = (InetSocketAddress)socketChannel.getRemoteAddress();
File file = new File(remoteAddress.getHostName() + "_" + remoteAddress.getPort() + ".txt");
FileChannel fileChannel = new FileOutputStream(file).getChannel();
fileMap.put(key1, fileChannel);
System.out.println(socketChannel.getRemoteAddress() + "連接成功...");
writeToClient(socketChannel);
}
else if (key.isReadable()){
readData(key);
}
// NIO的特點只會累加,已選擇的鍵的集合不會刪除,ready集合會被清空
// 只是臨時刪除已選擇鍵集合,當該鍵代表的通道上再次有感興趣的集合准備好之后,又會被select函數選中
it.remove();
}
}
}
private void writeToClient(SocketChannel socketChannel) throws IOException {
buffer.clear();
buffer.put((socketChannel.getRemoteAddress() + "連接成功").getBytes());
buffer.flip();
socketChannel.write(buffer);
buffer.clear();
}
private void readData(SelectionKey key) throws IOException {
FileChannel fileChannel = fileMap.get(key);
buffer.clear();
SocketChannel socketChannel = (SocketChannel) key.channel();
int num = 0;
try {
while ((num = socketChannel.read(buffer)) > 0) {
buffer.flip();
// 寫入文件
fileChannel.write(buffer);
buffer.clear();
}
} catch (IOException e) {
key.cancel();
e.printStackTrace();
return;
}
// 調用close為-1 到達末尾
if (num == -1) {
fileChannel.close();
System.out.println("上傳完畢");
buffer.put((socketChannel.getRemoteAddress() + "上傳成功").getBytes());
buffer.clear();
socketChannel.write(buffer);
key.cancel();
}
}
}
客戶端模擬三個客戶端同時向服務器發送文件
public class Client {
public static void main(String[] args) {
for (int i = 0; i < 3; i++) {
// 模擬三個發端
new Thread() {
public void run() {
try {
SocketChannel socketChannel = SocketChannel.open();
socketChannel.socket().connect(new InetSocketAddress("127.0.0.1", 8888));
File file = new File("E:\\" + 11 + ".txt");
FileChannel fileChannel = new FileInputStream(file).getChannel();
ByteBuffer buffer = ByteBuffer.allocate(100);
socketChannel.read(buffer);
buffer.flip();
System.out.println(new String(buffer.array(), 0, buffer.limit(), Charset.forName("utf-8")));
buffer.clear();
int num = 0;
while ((num=fileChannel.read(buffer)) > 0) {
buffer.flip();
socketChannel.write(buffer);
buffer.clear();
}
if (num == -1) {
fileChannel.close();
socketChannel.shutdownOutput();
}
// 接受服務器
socketChannel.read(buffer);
buffer.flip();
System.out.println(new String(buffer.array(), 0, buffer.limit(), Charset.forName("utf-8")));
buffer.clear();
socketChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
};
}.start();
}
Thread.yield();
}
}
可見這里我們僅僅使用了一個線程就管理了三個連接,相比以前使用阻塞的Socket要在accept函數返回后開啟線程來管理這個連接,而使用NIO我們在accept返回后,僅僅將其注冊到選擇器上,讀操作在下次檢測到有可讀的鍵的集合時就會去處理。
NIO+線程池改進
當然使用單線程並不都是一個好主意,使用單線程的好處在於任何情況下都只有一個線程能夠運行。通過消除在線程之間進行上下文切換帶來的額外開銷,總吞吐量可以得到提高。然而在其他情況下,尤其是對於多核CPU來說,使用單線程是對CPU的浪費,更好的策略是使用一個線程來管理選擇器,監控通道的就緒狀態,對於數據的處理可以使用線程池處理。
這樣上面的例子改為
public class ThreadPoolServer extends Server{
private ExecutorService exec = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws IOException {
ThreadPoolServer server = new ThreadPoolServer();
server.startServer();
}
@Override
protected void readData(final SelectionKey key) throws IOException {
// 移除掉這個key的可讀事件,已經在線程池里面處理
key.interestOps(key.interestOps() & (~SelectionKey.OP_READ));
exec.execute(new Runnable() {
@Override
public void run() {
ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
FileChannel fileChannel = fileMap.get(key);
buffer.clear();
SocketChannel socketChannel = (SocketChannel) key.channel();
int num = 0;
try {
while ((num = socketChannel.read(buffer)) > 0) {
buffer.flip();
// 寫入文件
fileChannel.write(buffer);
buffer.clear();
}
} catch (IOException e) {
key.cancel();
e.printStackTrace();
return;
}
// 調用close為-1 到達末尾
if (num == -1) {
try {
fileChannel.close();
System.out.println("上傳完畢");
buffer.put((socketChannel.getRemoteAddress() + "上傳成功").getBytes());
buffer.clear();
socketChannel.write(buffer);
} catch (IOException e) {
e.printStackTrace();
}
// 只有調用cancel才會真正從已選擇的鍵的集合里面移除,否則下次select的時候又能得到
// 一端close掉了,其對端仍然是可讀的,讀取得到EOF,返回-1
key.cancel();
return;
}
// Channel的read方法可能返回0,返回0並不一定代表讀取完了。
// 工作線程結束對通道的讀取,需要再次更新鍵的ready集合,將感興趣的集合重新放在里面
key.interestOps(key.interestOps() | SelectionKey.OP_READ);
// 調用wakeup,使得選擇器上的第一個還沒有返回的選擇操作立即返回即重新select
key.selector().wakeup();
}
});
}
}
幾點說明
1.將通道注冊到一個選擇其中,會返回一個鍵,這個鍵代表了通道和選擇器之間的注冊關系
2.SelectionKey鍵包含兩個集合
interest集合:指示每個通道所關心的操作,這時在注冊通道時確定的,也可以使用鍵的帶參數的interestOps方法修改。
ready集合:表示通道已經在該操作上准備好,是interest集合的子集,表示了interest集合中從上次調用select以來已經就緒的那些操作。該集合不能修改,由操作系統告訴我們。
Selector包含三個集合
已注冊的鍵的集合(Registered key set):調用Channel的register方法
已選擇的鍵的集合(Selected key set)
已取消的鍵的集合(Canceled key set):調用key的cancel方法,select的cancel方法等都會將該key放在已取消的鍵的集合,通道關閉時,相關的鍵也會自動取消
3.當調用select方法時,會執行以下操作,這是理解NIO中調用各操作的關鍵。
1).檢查已取消的鍵的集合,非空,將該鍵從另外兩個集合移除,這步完成后,已取消的鍵的集合為空
2).檢查已注冊的鍵的集合中的鍵的interest集合,對於那些操作系統指示至少已經准備好interest集合中的一種操作的通道,將執行以下兩種操作中的一種:
a.該鍵沒有出在已選擇的鍵的集合里面:該鍵的ready集合被清空,當前已經就緒的操作將會被添加到ready集合中,然后將該鍵放入已選擇的鍵的集合中
b.否則,鍵存在於選擇器的已選擇的鍵的集合中,它的ready 集合將是累積的,只會設置這次操作系統決定的操作的位,之前的設置不會變。
3).重復執行1
4).select操作返回的值是ready集合在步驟2中被修改的鍵的數量,而不是已准備好的所有鍵的個數。
從上面的選擇過程可以知道一旦一個選擇器將一個鍵添加到它的已選擇的鍵的集合中,它就不會移除這個鍵,並且,一旦一個鍵處於已選擇的鍵的集合中,這個鍵的ready 集合將只會被設置,而不會被清理。於是管理鍵以確保它們正確的狀態信息的任務要由程序員來做。
程序中的remove方法將一個鍵從已選擇的集合中刪除,否則不會自動刪除,表示已經對這個鍵所代表的的通道進行了處理。
當一個通道上的操作執行完想要刪除該通道時,調用key的cancel方法,它將被加入到取消集合中,注冊的關系不會立刻取消,鍵會立即失效,下一次調用select方法,就會從另兩個集合中刪除,通道也會被注銷。
使用NIO+線程池時,在上面的程序中當有可讀事件時,交由線程池去處理,這個期間要忽略該通道的read操作,即將其從interest集合刪除。當線程結束給該通道的操作時,需要更新鍵的ready集合,將感興趣的集合重新放在里面。調用selector的wakeup方法是為了防止如果此時阻塞在select上,他會立刻返回。
4.tcp中仍然需要告訴對端結束標志,這里調用了shutDownOutput方法(NIO的API 1.7以后才有,可以使用結束標志),這樣另一端的read方法就會讀到EOF,返回-1
