在網絡的初期,網民很少,服務器完全無壓力,那時的技術也沒有現在先進,通常用一個線程來全程跟蹤處理一個請求。因為這樣最簡單。
其實代碼實現大家都知道,就是服務器上有個ServerSocket在某個端口監聽,接收到客戶端的連接后,會創建一個Socket,並把它交給一個線程進行后續處理。
線程主要從Socket讀取客戶端傳過來的數據,然后進行業務處理,並把結果再寫入Socket傳回客戶端。
由於網絡的原因,Socket創建后並不一定能立刻從它上面讀取數據,可能需要等一段時間,此時線程也必須一直阻塞着。在向Socket寫入數據時,也可能會使線程阻塞。
這里准備了一個示例,主要邏輯如下:
客戶端:創建20個Socket並連接到服務器上,再創建20個線程,每個線程負責一個Socket。
服務器端:接收到這20個連接,創建20個Socket,接着創建20個線程,每個線程負責一個Socket。
為了模擬服務器端的Socket在創建后不能立馬讀取數據,讓客戶端的20個線程分別休眠5-10之間的一個隨機秒數。
客戶端的20個線程會在第5秒到第10秒這段時間內陸陸續續的向服務器端發送數據,服務器端的20個線程也會陸陸續續接收到數據。
/** * @author lixinjie * @since 2019-05-07 */ public class BioServer { static AtomicInteger counter = new AtomicInteger(0); static SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss"); public static void main(String[] args) { try { ServerSocket ss = new ServerSocket(); ss.bind(new InetSocketAddress("localhost", 8080)); while (true) { Socket s = ss.accept(); processWithNewThread(s); } } catch (Exception e) { e.printStackTrace(); } } static void processWithNewThread(Socket s) { Runnable run = () -> { InetSocketAddress rsa = (InetSocketAddress)s.getRemoteSocketAddress(); System.out.println(time() + "->" + rsa.getHostName() + ":" + rsa.getPort() + "->" + Thread.currentThread().getId() + ":" + counter.incrementAndGet()); try { String result = readBytes(s.getInputStream()); System.out.println(time() + "->" + result + "->" + Thread.currentThread().getId() + ":" + counter.getAndDecrement()); s.close(); } catch (Exception e) { e.printStackTrace(); } }; new Thread(run).start(); } static String readBytes(InputStream is) throws Exception { long start = 0; int total = 0; int count = 0; byte[] bytes = new byte[1024]; //開始讀數據的時間 long begin = System.currentTimeMillis(); while ((count = is.read(bytes)) > -1) { if (start < 1) { //第一次讀到數據的時間 start = System.currentTimeMillis(); } total += count; } //讀完數據的時間 long end = System.currentTimeMillis(); return "wait=" + (start - begin) + "ms,read=" + (end - start) + "ms,total=" + total + "bs"; } static String time() { return sdf.format(new Date()); } } /** * @author lixinjie * @since 2019-05-07 */ public class Client { public static void main(String[] args) { try { for (int i = 0; i < 20; i++) { Socket s = new Socket(); s.connect(new InetSocketAddress("localhost", 8080)); processWithNewThread(s, i); } } catch (IOException e) { e.printStackTrace(); } } static void processWithNewThread(Socket s, int i) { Runnable run = () -> { try { //睡眠隨機的5-10秒,模擬數據尚未就緒 Thread.sleep((new Random().nextInt(6) + 5) * 1000); //寫1M數據,為了拉長服務器端讀數據的過程 s.getOutputStream().write(prepareBytes()); //睡眠1秒,讓服務器端把數據讀完 Thread.sleep(1000); s.close(); } catch (Exception e) { e.printStackTrace(); } }; new Thread(run).start(); } static byte[] prepareBytes() { byte[] bytes = new byte[1024*1024*1]; for (int i = 0; i < bytes.length; i++) { bytes[i] = 1; } return bytes; } }
執行結果如下:
時間->IP:Port->線程Id:當前線程數 15:11:52->127.0.0.1:55201->10:1 15:11:52->127.0.0.1:55203->12:2 15:11:52->127.0.0.1:55204->13:3 15:11:52->127.0.0.1:55207->16:4 15:11:52->127.0.0.1:55208->17:5 15:11:52->127.0.0.1:55202->11:6 15:11:52->127.0.0.1:55205->14:7 15:11:52->127.0.0.1:55206->15:8 15:11:52->127.0.0.1:55209->18:9 15:11:52->127.0.0.1:55210->19:10 15:11:52->127.0.0.1:55213->22:11 15:11:52->127.0.0.1:55214->23:12 15:11:52->127.0.0.1:55217->26:13 15:11:52->127.0.0.1:55211->20:14 15:11:52->127.0.0.1:55218->27:15 15:11:52->127.0.0.1:55212->21:16 15:11:52->127.0.0.1:55215->24:17 15:11:52->127.0.0.1:55216->25:18 15:11:52->127.0.0.1:55219->28:19 15:11:52->127.0.0.1:55220->29:20 時間->等待數據的時間,讀取數據的時間,總共讀取的字節數->線程Id:當前線程數 15:11:58->wait=5012ms,read=1022ms,total=1048576bs->17:20 15:11:58->wait=5021ms,read=1022ms,total=1048576bs->13:19 15:11:58->wait=5034ms,read=1008ms,total=1048576bs->11:18 15:11:58->wait=5046ms,read=1003ms,total=1048576bs->12:17 15:11:58->wait=5038ms,read=1005ms,total=1048576bs->23:16 15:11:58->wait=5037ms,read=1010ms,total=1048576bs->22:15 15:11:59->wait=6001ms,read=1017ms,total=1048576bs->15:14 15:11:59->wait=6016ms,read=1013ms,total=1048576bs->27:13 15:11:59->wait=6011ms,read=1018ms,total=1048576bs->24:12 15:12:00->wait=7005ms,read=1008ms,total=1048576bs->20:11 15:12:00->wait=6999ms,read=1020ms,total=1048576bs->14:10 15:12:00->wait=7019ms,read=1007ms,total=1048576bs->26:9 15:12:00->wait=7012ms,read=1015ms,total=1048576bs->21:8 15:12:00->wait=7023ms,read=1008ms,total=1048576bs->25:7 15:12:01->wait=7999ms,read=1011ms,total=1048576bs->18:6 15:12:02->wait=9026ms,read=1014ms,total=1048576bs->10:5 15:12:02->wait=9005ms,read=1031ms,total=1048576bs->19:4 15:12:03->wait=10007ms,read=1011ms,total=1048576bs->16:3 15:12:03->wait=10006ms,read=1017ms,total=1048576bs->29:2 15:12:03->wait=10010ms,read=1022ms,total=1048576bs->28:1
可以看到服務器端確實為每個連接創建一個線程,共創建了20個線程。
客戶端進入休眠約5-10秒,模擬連接上數據不就緒,服務器端線程在等待,等待時間約5-10秒。
客戶端陸續結束休眠,往連接上寫入1M數據,服務器端開始讀取數據,整個讀取過程約1秒。
可以看到,服務器端的工作線程會把時間花在“等待數據”和“讀取數據”這兩個過程上。
這有兩個不好的地方:
- 一是有很多客戶端同時發起請求的話,服務器端要創建很多的線程,可能會因為超過了上限而造成崩潰。
- 二是每個線程的大部分時光中都是在阻塞着,無事可干,造成極大的資源浪費。
開頭已經說了那個年代網民很少,所以,不可能會有大量請求同時過來。至於資源浪費就浪費吧,反正閑着也是閑着。
來個簡單的小例子:
飯店共有10張桌子,且配備了10位服務員。只要有客人來了,大堂經理就把客人帶到一張桌子,並安排一位服務員全程陪同。
即使客人暫時不需要服務,服務員也一直在旁邊站着。可能覺着是一種浪費,其實非也,這就是尊貴的VIP服務。
其實,VIP映射的是一對一的模型,主要體現在“專用”上或“私有”上。
真正的多路復用技術
多路復用技術原本指的是,在通信方面,多種信號或數據(從宏觀上看)交織在一起,使用同一條傳輸通道進行傳輸。
這樣做的目的,一方面可以充分利用通道的傳輸能力,另一方面自然是省時省力省錢啦。
其實這個概念非常的“生活化”,隨手就可以舉個例子:
一條小水渠里水在流,在一端往里倒入大量乒乓球,在另一端用網進行過濾,把乒乓球和水流分開。
這就是一個比較“土”的多路復用,首先在發射端把多種信號或數據進行“混合”,接着是在通道上進行傳輸,最后在接收端“分離”出自己需要的信號或數據。
相信大家都看出來了,這里的重點其實就是處理好“混合”和“分離”,對於不同的信號或數據,有不同的處理方法。
比如以前的有線電視是模擬信號,即電磁波。一家一般只有一根信號線,但可以同時接多個電視,每個電視任意換台,互不影響。
這是由於不同頻率的波可以混合和分離。(當然,可能不是十分准確,明白意思就行了。)
再比如城市的高鐵站一般都有數個站台供高鐵(同時)停靠,但城市間的高鐵軌道單方向只有一條,如何保證那么多趟高鐵安全運行呢?
很明顯是分時使用,每趟高鐵都有自己的時刻。多趟高鐵按不同的時刻出站相當於混合,按不同的時刻進站相當於分離。
總結一下,多路指的是多種不同的信號或數據或其它事物,復用指的是共用同一個物理鏈路或通道或載體。
可見,多路復用技術是一種一對多的模型,“多”的這一方復用了“一”的這一方。
其實,一對多的模型主要體現在“公用”上或“共享”上。
您先看着,我一會再過來
一對一服務是典型的有錢任性,雖然響應及時、服務周到,但不是每個人都能享受的,畢竟還是“屌絲”多嘛,那就來個共享服務吧。
所以實際當中更多的情況是,客人坐下后,會給他一個菜單,讓他先看着,反正也不可能立馬點餐,服務員就去忙別的了。
可能不時的會有服務員從客人身旁經過,發現客人還沒有點餐,就會主動去詢問現在需要點餐嗎?
如果需要,服務員就給你寫菜單,如果不需要,服務員就繼續往前走了。
這種情況飯店整體運行的也很好,但是服務員人數少多了。現在服務10桌客人,4個服務員綽綽有余。(這節省的可都是純利潤呀。)
因為10桌客人同時需要服務的情況幾乎是不會發生的,絕大部分情況都是錯開的。如果真有的話,那就等會好了,又不是120/119,人命關天的。
回到代碼里,情況與之非常相似,完全可以采用相同的理論去處理。
連接建立后,找個地方把它放到那里,可以暫時先不管它,反正此時也沒有數據可讀。
但是數據早晚會到來的,所以,要不時的去詢問每個連接有數據沒有,有的話就讀取數據,沒有的話就繼續不管它。
其實這個模式在Java里早就有了,就是Java NIO,這里的大寫字母“N”是單詞“New”,即“新”的意思,主要是為了和上面的“一對一”進行區分。
先鋪墊一下吧
現在需要把Socket交互的過程再稍微細化一些。客戶端先請求連接,connect,服務器端然后接受連接,accept,然后客戶端再向連接寫入數據,write,接着服務器端從連接上讀出數據,read。
和打電話的場景一樣,主叫撥號,connect,被叫接聽,accept,主叫說話,speak,被叫聆聽,listen。主叫給被叫打電話,說明主叫找被叫有事,所以被叫關注的是接通電話,聽對方說。
客戶端主動向服務器端發起請求,說明客戶端×××器端有事,所以服務器端關注的是接受請求,讀取對方傳來的數據。這里把接受請求,讀取數據稱為服務器端感興趣的操作。
在Java NIO中,接受請求的操作,用OP_ACCEPT表示,讀取數據的操作,用OP_READ表示。
我決定先過一遍飯店的場景,讓首次接觸Java NIO的同學不那么迷茫。就是把常規的場景進行了定向整理,稍微有點刻意,明白意思就行了。
1、專門設立一個“跑腿”服務員,工作職責單一,就是問問客人是否需要服務。
2、站在門口接待客人,本來是大堂經理的工作,但是他不願意在門口盯着,於是就委托給跑腿服務員,你幫我盯着,有人來了告訴我。
於是跑腿服務員就有了一個任務,替大堂經理盯梢。終於來客人了,跑腿服務員趕緊告訴了大堂經理。
3、大堂經理把客人帶到座位上,對跑腿服務員說,客人接下來肯定是要點餐的,但是現在在看菜單,不知道什么時候能看好,所以你不時的過來問問,看需不需要點餐,需要的話就再喊來一個“點餐”服務員給客人寫菜單。
於是跑腿服務員就又多了一個任務,就是盯着這桌客人,不時來問問,如果需要服務的話,就叫點餐服務員過來服務。
4、跑腿服務員在某次詢問中,客人終於決定點餐了,跑題服務員趕緊找來一個點餐服務員為客人寫菜單。
5、就這樣,跑腿服務員既要盯着門外新過來的客人,也要盯着門內已經就坐的客人。新客人來了,通知大堂經理去接待。就坐的客人決定點餐了,通知點餐服務員去寫菜單。
事情就這樣一直循環的持續下去,一切,都挺好。角色明確,職責單一,配合很好。
大堂經理和點餐服務員是需求的提供者或實現者,跑腿服務員是需求的發現者,並識別出需求的種類,需要接待的交給大堂經理,需要點餐的交給點餐服務員。
哈哈,Java NIO來啦
代碼的寫法非常的固定,可以配合着后面的解說來看,這樣就好理解了,如下:
/** * @author lixinjie * @since 2019-05-07 */ public class NioServer { static int clientCount = 0; static AtomicInteger counter = new AtomicInteger(0); static SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss"); public static void main(String[] args) { try { Selector selector = Selector.open(); ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.configureBlocking(false); ssc.register(selector, SelectionKey.OP_ACCEPT); ssc.bind(new InetSocketAddress("localhost", 8080)); while (true) { selector.select(); Set<SelectionKey> keys = selector.selectedKeys(); Iterator<SelectionKey> iterator = keys.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); iterator.remove(); if (key.isAcceptable()) { ServerSocketChannel ssc1 = (ServerSocketChannel)key.channel(); SocketChannel sc = null; while ((sc = ssc1.accept()) != null) { sc.configureBlocking(false); sc.register(selector, SelectionKey.OP_READ); InetSocketAddress rsa = (InetSocketAddress)sc.socket().getRemoteSocketAddress(); System.out.println(time() + "->" + rsa.getHostName() + ":" + rsa.getPort() + "->" + Thread.currentThread().getId() + ":" + (++clientCount)); } } else if (key.isReadable()) { //先將“讀”從感興趣操作移出,待把數據從通道中讀完后,再把“讀”添加到感興趣操作中 //否則,該通道會一直被選出來 key.interestOps(key.interestOps() & (~ SelectionKey.OP_READ)); processWithNewThread((SocketChannel)key.channel(), key); } } } } catch (Exception e) { e.printStackTrace(); } } static void processWithNewThread(SocketChannel sc, SelectionKey key) { Runnable run = () -> { counter.incrementAndGet(); try { String result = readBytes(sc); //把“讀”加進去 key.interestOps(key.interestOps() | SelectionKey.OP_READ); System.out.println(time() + "->" + result + "->" + Thread.currentThread().getId() + ":" + counter.get()); sc.close(); } catch (Exception e) { e.printStackTrace(); } counter.decrementAndGet(); }; new Thread(run).start(); } static String readBytes(SocketChannel sc) throws Exception { long start = 0; int total = 0; int count = 0; ByteBuffer bb = ByteBuffer.allocate(1024); //開始讀數據的時間 long begin = System.currentTimeMillis(); while ((count = sc.read(bb)) > -1) { if (start < 1) { //第一次讀到數據的時間 start = System.currentTimeMillis(); } total += count; bb.clear(); } //讀完數據的時間 long end = System.currentTimeMillis(); return "wait=" + (start - begin) + "ms,read=" + (end - start) + "ms,total=" + total + "bs"; } static String time() { return sdf.format(new Date()); } }
它的大致處理過程如下:
1、定義一個選擇器,Selector。
相當於設立一個跑腿服務員。
2、定義一個服務器端套接字通道,ServerSocketChannel,並配置為非阻塞的。
相等於聘請了一位大堂經理。
3、將套接字通道注冊到選擇器上,並把感興趣的操作設置為OP_ACCEPT。
相當於大堂經理給跑腿服務員說,幫我盯着門外,有客人來了告訴我。
4、進入死循環,選擇器不時的進行選擇。
相當於跑腿服務員一遍又一遍的去詢問、去轉悠。
5、選擇器終於選擇出了通道,發現通道是需要Acceptable的。
相當於跑腿服務員終於發現門外來客人了,客人是需要接待的。
6、於是服務器端套接字接受了這個通道,開始處理。
相當於跑腿服務員把大堂經理叫來了,大堂經理開始着手接待。
7、把新接受的通道配置為非阻塞的,並把它也注冊到了選擇器上,該通道感興趣的操作為OP_READ。
相當於大堂經理把客人帶到座位上,給了客人菜單,並又把客人委托給跑腿服務員,說客人接下來肯定是要點餐的,你不時的來問問。
8、選擇器繼續不時的進行選擇着。
相當於跑腿服務員繼續不時的詢問着、轉悠着。
9、選擇器終於又選擇出了通道,這次發現通道是需要Readable的。
相當於跑腿服務員終於發現了一桌客人有了需求,是需要點餐的。
10、把這個通道交給了一個新的工作線程去處理。
相當於跑腿服務員叫來了點餐服務員,點餐服務員開始為客人寫菜單。
11、這個工作線程處理完后,就被回收了,可以再去處理其它通道。
相當於點餐服務員寫好菜單后,就走了,可以再去為其他客人寫菜單。
12、選擇器繼續着重復的選擇工作,不知道什么時候是個頭。
相當於跑腿服務員繼續着重復的詢問、轉悠,不知道未來在何方。
相信你已經看出來了,大堂經理相當於服務器端套接字,跑腿服務員相當於選擇器,點餐服務員相當於Worker線程。
啟動服務器端代碼,使用同一個客戶端代碼,按相同的套路發20個請求,結果如下:
時間->IP:Port->主線程Id:當前連接數 16:34:39->127.0.0.1:56105->1:1 16:34:39->127.0.0.1:56106->1:2 16:34:39->127.0.0.1:56107->1:3 16:34:39->127.0.0.1:56108->1:4 16:34:39->127.0.0.1:56109->1:5 16:34:39->127.0.0.1:56110->1:6 16:34:39->127.0.0.1:56111->1:7 16:34:39->127.0.0.1:56112->1:8 16:34:39->127.0.0.1:56113->1:9 16:34:39->127.0.0.1:56114->1:10 16:34:39->127.0.0.1:56115->1:11 16:34:39->127.0.0.1:56116->1:12 16:34:39->127.0.0.1:56117->1:13 16:34:39->127.0.0.1:56118->1:14 16:34:39->127.0.0.1:56119->1:15 16:34:39->127.0.0.1:56120->1:16 16:34:39->127.0.0.1:56121->1:17 16:34:39->127.0.0.1:56122->1:18 16:34:39->127.0.0.1:56123->1:19 16:34:39->127.0.0.1:56124->1:20 時間->等待數據的時間,讀取數據的時間,總共讀取的字節數->線程Id:當前線程數 16:34:45->wait=1ms,read=1018ms,total=1048576bs->11:5 16:34:45->wait=0ms,read=1054ms,total=1048576bs->10:5 16:34:45->wait=0ms,read=1072ms,total=1048576bs->13:6 16:34:45->wait=0ms,read=1061ms,total=1048576bs->14:5 16:34:45->wait=0ms,read=1140ms,total=1048576bs->12:4 16:34:46->wait=0ms,read=1001ms,total=1048576bs->15:5 16:34:46->wait=0ms,read=1062ms,total=1048576bs->17:6 16:34:46->wait=0ms,read=1059ms,total=1048576bs->16:5 16:34:47->wait=0ms,read=1001ms,total=1048576bs->19:4 16:34:47->wait=0ms,read=1001ms,total=1048576bs->20:4 16:34:47->wait=0ms,read=1015ms,total=1048576bs->18:3 16:34:47->wait=0ms,read=1001ms,total=1048576bs->21:2 16:34:48->wait=0ms,read=1032ms,total=1048576bs->22:4 16:34:49->wait=0ms,read=1002ms,total=1048576bs->23:3 16:34:49->wait=0ms,read=1001ms,total=1048576bs->25:2 16:34:49->wait=0ms,read=1028ms,total=1048576bs->24:4 16:34:50->wait=0ms,read=1008ms,total=1048576bs->28:4 16:34:50->wait=0ms,read=1033ms,total=1048576bs->27:3 16:34:50->wait=1ms,read=1002ms,total=1048576bs->29:2 16:34:50->wait=0ms,read=1001ms,total=1048576bs->26:2
服務器端接受20個連接,創建20個通道,並把它們注冊到選擇器上,此時不需要額外線程。
當某個通道已經有數據時,才會用一個線程來處理它,所以,線程“等待數據”的時間是0,“讀取數據”的時間還是約1秒。
因為20個通道是陸陸續續有數據的,所以服務器端最多時是6個線程在同時運行的,換句話說,用包含6個線程的線程池就可以了。
對比與結論:
處理同樣的20個請求,一個需要用20個線程,一個需要用6個線程,節省了70%線程數。
在本例中,兩種感興趣的操作共用一個選擇器,且選擇器運行在主線程里,Worker線程是新的線程。
其實對於選擇器的個數、選擇器運行在哪個線程里、是否使用新的線程來處理請求都沒有要求,要根據實際情況來定。
比如說redis,和處理請求相關的就一個線程,選擇器運行在里面,處理請求的程序也運行在里面,所以這個線程既是I/O線程,也是Worker線程。
當然,也可以使用兩個選擇器,一個處理OP_ACCEPT,一個處理OP_READ,讓它們分別運行在兩個單獨的I/O線程里。對於能快速完成的操作可以直接在I/O線程里做了,對於非常耗時的操作一定要使用Worker線程池來處理。
這種處理模式就是被稱為的多路復用I/O,多路指的是多個Socket通道,復用指的是只用一個線程來管理它們。
再稍微分析一下
一對一的形式,一個桌子配一個服務員,一個Socket分配一個線程,響應速度最快,畢竟是VIP嘛,但是效率很低,服務員大部分時間都是在站着,線程大部分時間都是在等待。
多路復用的形式,所有桌子共用一個跑腿服務員,所有Socket共用一個選擇器線程,響應速度肯定變慢了,畢竟是一對多嘛。但是效率提高了,點餐服務員在需要點餐時才會過去,工作線程在數據就緒時才會開始工作。
從VIP到多路復用,形式上確實有很大的不同,其本質是從一對一到一對多的轉變,其實就是犧牲了響應速度,換來了效率的提升,不過綜合性能還是得到了極大的改進。
就飯店而言,究竟幾張桌子配一個跑腿服務員,幾張桌子配一個點餐服務員,經過一段時間運行,一定會有一個最優解。
就程序而言,究竟需要幾個選擇器線程,幾個工作線程,經過評估測試后,也會有一個最優解。
一旦達到最優解后,就不可能再提升了,這同樣是由多路復用這種一對多的形式所限制的。就像一對一的形式限制一樣。
人們的追求是無止境的,如何對多路復用繼續提升呢?答案一定是具有顛覆性的,即拋棄多路復用,采用全新的形式。
還以飯店為例,如何在最優解的情況下,既要繼續減少服務員數量,還要使效率提升呢?可能有些朋友已經猜到了,那就是拋棄服務員服務客人這種模式,把飯店改成自助餐廳。
在客人進門時,把餐具給他,並告訴他就餐時長、不准浪費等這些規則,然后就不用管了。客人自己選餐,自己吃完,自己走人,不用再等服務員了,因此也不再需要服務員了。(收拾桌子的除外。)
這種模式對應到程序里,其實就是AIO,在Java里也早就有了。
嘻嘻,Java AIO來啦
代碼的寫法非常的固定,可以配合着后面的解說來看,這樣就好理解了,如下:
/** * @author lixinjie * @since 2019-05-13 */ public class AioServer { static int clientCount = 0; static AtomicInteger counter = new AtomicInteger(0); static SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss"); public static void main(String[] args) { try { AsynchronousServerSocketChannel assc = AsynchronousServerSocketChannel.open(); assc.bind(new InetSocketAddress("localhost", 8080)); //非阻塞方法,其實就是注冊了個回調,而且只能接受一個連接 assc.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() { @Override public void completed(AsynchronousSocketChannel asc, Object attachment) { //再次注冊,接受下一個連接 assc.accept(null, this); try { InetSocketAddress rsa = (InetSocketAddress)asc.getRemoteAddress(); System.out.println(time() + "->" + rsa.getHostName() + ":" + rsa.getPort() + "->" + Thread.currentThread().getId() + ":" + (++clientCount)); } catch (Exception e) { } readFromChannelAsync(asc); } @Override public void failed(Throwable exc, Object attachment) { } }); //不讓主線程退出 synchronized (AioServer.class) { AioServer.class.wait(); } } catch (Exception e) { e.printStackTrace(); } } static void readFromChannelAsync(AsynchronousSocketChannel asc) { //會把數據讀入到該buffer之后,再觸發工作線程來執行回調 ByteBuffer bb = ByteBuffer.allocate(1024*1024*1 + 1); long begin = System.currentTimeMillis(); //非阻塞方法,其實就是注冊了個回調,而且只能接受一次讀取 asc.read(bb, null, new CompletionHandler<Integer, Object>() { //從該連接上一共讀到的字節數 int total = 0; /** * @param count 表示本次讀取到的字節數,-1表示數據已讀完 */ @Override public void completed(Integer count, Object attachment) { counter.incrementAndGet(); if (count > -1) { total += count; } int size = bb.position(); System.out.println(time() + "->count=" + count + ",total=" + total + "bs,buffer=" + size + "bs->" + Thread.currentThread().getId() + ":" + counter.get()); if (count > -1) {//數據還沒有讀完 //再次注冊回調,接受下一次讀取 asc.read(bb, null, this); } else {//數據已讀完 try { asc.close(); } catch (Exception e) { e.printStackTrace(); } } counter.decrementAndGet(); } @Override public void failed(Throwable exc, Object attachment) { } }); long end = System.currentTimeMillis(); System.out.println(time() + "->exe read req,use=" + (end -begin) + "ms" + "->" + Thread.currentThread().getId()); } static String time() { return sdf.format(new Date()); } }
它的大致處理過程如下:
1、初始化一個AsynchronousServerSocketChannel對象,並開始監聽
2、通過accept方法注冊一個“完成處理器”的接受連接回調,即CompletionHandler,用於在接受到連接后的相關操作。
3、當客戶端連接過來后,由系統來接受,並創建好AsynchronousSocketChannel對象,然后觸發該回調,並把該對象傳進該回調,該回調會在Worker線程中執行。
4、在接受連接回調里,再次使用accept方法注冊一次相同的完成處理器對象,用於讓系統接受下一個連接。就是這種注冊只能使用一次,所以要不停的連續注冊,人家就是這樣設計的。
5、在接受連接回調里,使用AsynchronousSocketChannel對象的read方法注冊另一個接受數據回調,用於在接受到數據后的相關操作。
6、當客戶端數據過來后,由系統接受,並放入指定好的ByteBuffer中,然后觸發該回調,並把本次接受到的數據字節數傳入該回調,該回調會在Worker線程中執行。
7、在接受數據回調里,如果數據沒有接受完,需要再次使用read方法把同一個對象注冊一次,用於讓系統接受下一次數據。這和上面的套路是一樣的。
8、客戶端的數據可能是分多次傳到服務器端的,所以接受數據回調會被執行多次,直到數據接受完為止。多次接受到的數據合起來才是完整的數據,這個一定要處理好。
9、關於ByteBuffer,要么足夠的大,能夠裝得下完整的客戶端數據,這樣多次接受的數據直接往里追加即可。要么每次把ByteBuffer中的數據移到別的地方存儲起來,然后清空ByteBuffer,用於讓系統往里裝入下一次接受的數據。
注:如果出現ByteBuffer空間不足,則系統不會裝入數據,就會導致客戶端數據總是讀不完,極有可能進入死循環。
啟動服務器端代碼,使用同一個客戶端代碼,按相同的套路發20個請求,結果如下:
時間->IP:Port->回調線程Id:當前連接數 17:20:47->127.0.0.1:56454->15:1 時間->發起一個讀請求,耗時->回調線程Id 17:20:47->exe read req,use=3ms->15 17:20:47->127.0.0.1:56455->15:2 17:20:47->exe read req,use=1ms->15 17:20:47->127.0.0.1:56456->15:3 17:20:47->exe read req,use=0ms->15 17:20:47->127.0.0.1:56457->16:4 17:20:47->127.0.0.1:56458->15:5 17:20:47->exe read req,use=1ms->16 17:20:47->exe read req,use=1ms->15 17:20:47->127.0.0.1:56460->15:6 17:20:47->127.0.0.1:56459->17:7 17:20:47->exe read req,use=0ms->15 17:20:47->127.0.0.1:56462->15:8 17:20:47->127.0.0.1:56461->16:9 17:20:47->exe read req,use=1ms->15 17:20:47->exe read req,use=0ms->16 17:20:47->exe read req,use=0ms->17 17:20:47->127.0.0.1:56465->16:10 17:20:47->127.0.0.1:56463->18:11 17:20:47->exe read req,use=0ms->18 17:20:47->127.0.0.1:56466->15:12 17:20:47->exe read req,use=1ms->16 17:20:47->127.0.0.1:56464->17:13 17:20:47->exe read req,use=1ms->15 17:20:47->127.0.0.1:56467->18:14 17:20:47->exe read req,use=2ms->17 17:20:47->exe read req,use=1ms->18 17:20:47->127.0.0.1:56468->15:15 17:20:47->exe read req,use=1ms->15 17:20:47->127.0.0.1:56469->16:16 17:20:47->127.0.0.1:56470->18:17 17:20:47->exe read req,use=1ms->18 17:20:47->exe read req,use=1ms->16 17:20:47->127.0.0.1:56472->15:18 17:20:47->127.0.0.1:56473->19:19 17:20:47->exe read req,use=2ms->15 17:20:47->127.0.0.1:56471->17:20 17:20:47->exe read req,use=1ms->19 17:20:47->exe read req,use=1ms->17 時間->本次接受到的字節數,截至到目前接受到的字節總數,buffer中的字節總數->回調線程Id:當前線程數 17:20:52->count=65536,total=65536bs,buffer=65536bs->14:1 17:20:52->count=65536,total=65536bs,buffer=65536bs->14:1 17:20:52->count=65536,total=65536bs,buffer=65536bs->14:1 17:20:52->count=230188,total=295724bs,buffer=295724bs->12:1 17:20:52->count=752852,total=1048576bs,buffer=1048576bs->14:3 17:20:52->count=131072,total=196608bs,buffer=196608bs->17:2 。。。。。。。。。。。。。。。。。。。。。。 17:20:57->count=-1,total=1048576bs,buffer=1048576bs->15:1 17:20:57->count=-1,total=1048576bs,buffer=1048576bs->15:1 17:20:57->count=-1,total=1048576bs,buffer=1048576bs->15:1 17:20:57->count=-1,total=1048576bs,buffer=1048576bs->15:1 17:20:58->count=-1,total=1048576bs,buffer=1048576bs->15:1 17:20:58->count=-1,total=1048576bs,buffer=1048576bs->15:1 17:20:58->count=-1,total=1048576bs,buffer=1048576bs->15:1
系統接受到連接后,在工作線程中執行了回調。並且在回調中執行了read方法,耗時是0,因為只是注冊了個接受數據的回調而已。
系統接受到數據后,把數據放入ByteBuffer,在工作線程中執行了回調。並且回調中可以直接使用ByteBuffer中的數據。
接受數據的回調被執行了多次,多次接受到的數據加起來正好等於客戶端傳來的數據。
因為系統是接受到數據后才觸發的回調,所以服務器端最多時是3個線程在同時運行回調的,換句話說,線程池包含3個線程就可以了。
對比與結論:
處理同樣的20個請求,一個需要用20個線程,一個需要用6個線程,一個需要3個線程,又節省了50%線程數。
注:不用特別較真這個比較結果,這里只是為了說明問題而已。哈哈。
三種處理方式的對比
第一種是阻塞IO,阻塞點有兩個,等待數據就緒的過程和讀取數據的過程。
第二種是阻塞IO,阻塞點有一個,讀取數據的過程。
第三種是非阻塞IO,沒有阻塞點,當工作線程啟動時,數據已經(被系統)准備好可以直接用了。
可見,這是一個逐步消除阻塞點的過程。
再次來談談各種IO:
只有一個線程,接受一個連接,讀取數據,處理業務,寫回結果,再接受下一個連接,這是同步阻塞。這種用法幾乎沒有。
一個線程和一個線程池,線程接受到連接后,把它丟給線程池中的線程,再接受下一個連接,這是異步阻塞。對應示例一。
一個線程和一個線程池,線程運行selector,執行select操作,把就緒的連接拿出來丟給線程池中的線程,再執行下一次的select操作,就是多路復用,這是異步阻塞。對應示例二。
一個線程和一個線程池,線程注冊一個accept回調,系統幫我們接受好連接后,才觸發回調在線程池中執行,執行時再注冊read回調,系統幫我們接受好數據后,才觸發回調在線程池中執行,就是AIO,這是異步非阻塞。對應示例三。
redis也是多路復用,但它只有一個線程在執行select操作,處理就緒的連接,整個是串行化的,所以天然不存在並發問題。只能把它歸為同步阻塞了。
BIO是阻塞IO,可以是同步阻塞,也可以是異步阻塞。AIO是異步IO,只有異步非阻塞這一種。因此沒有同步非阻塞這種說法,因為同步一定是阻塞的。
注:以上的說法是站在用戶程序/線程的立場上來說的。