Netty中的那些坑(上篇)
最近開發了一個純異步的redis客戶端,算是比較深入的使用了一把netty。在使用過程中一邊優化,一邊解決各種坑。兒這些坑大部分基本上是Netty4對Netty3的改進部分引起的。
注:這里說的坑不是說netty不好,只是如果這些地方不注意,或者不去看netty的代碼,就有可能掉進去了。
坑1: Netty 4的線程模型轉變
在Netty 3的時候,upstream是在IO線程里執行的,而downstream是在業務線程里執行的。比如netty從網絡讀取一個包傳遞給你的handler的時候,你的handler部分的代碼是執行在IO線程里,而你的業務線程調用write向網絡寫出一些東西的時候,你的handler是執行在業務線程里。而Netty 4修改了這一模型。在Netty 4里inbound(upstream)和outbound(downstream)都是執行在EventLoop(IO線程)里。也就是你如果在業務線程里通過channel.write向網絡寫出一些東西的時候,在某一點,netty 4會往這個channel的EventLoop里提交一個寫出的任務。那也就是業務線程和IO線程是異步執行的。
這有什么問題呢?一般我們在網絡通信里,業務層寫出的都是對象。然后經過序列化等手段轉換成字節流到網絡,而Netty給我們提供了很好的編碼解碼的模型,一般我們也會將序列化和反序列化放到一個handler里處理,而在Netty 4里這些handler都是在EventLoop里執行,那么就意味着在Netty 4里下面的代碼可能會導致一些微妙的結果:
User user = new User();
user.setName("admin");
channel.write(user);
user.setName("guest");
因為序列化和業務線程異步執行,那么在write執行后並不表示user對象已經序列化了,如果這個時候修改了user對象那么傳遞到peer的對象可能就不再是你期望的那個user了。所以在Netty 4里如果還是使用handler實現序列化就一定要小心了。你要么在調用channel.write寫出之前將對象進行深度拷貝,要么就不在handler里進行序列化了,直接將序列化好的東西傳遞給channel。—— 現在通常在業務線程做序列化。
2. 在不同的線程里使用PooledByteBufAllocator分配和回收
這個問題其實是上面一個問題的續集。在碰到之前一個問題后,我們就決定不再在handler里做序列化了,而是直接在業務線程里做。但是為了減少內存的拷貝,我們就期望在序列化的時候直接將字節流序列化到DirectByteBuf里,這樣通過socket寫出的時候就不進行拷貝了。而DirectByteBuf的分配成本比HeapByteBuf的成本要高,為此Netty 4借鑒jemalloc的思路實現了一個PooledByteBufAllocator。顧名思義,就是將DirectByteBuf池化起來,回收的時候不真正回收,分配的時候從池里取一個空閑的。這對於大多數應用來說優化效果還是很明顯的,比如在一些RPC場景中,我們所傳遞的對象的大小往往是差不多的,這可以充分利用池化的效果。
但是我們在使用類似下面的偽代碼的時候內存占用不斷飆高,然后瘋狂Full GC,並且有的時候還會出現OOM。這好像是內存泄漏的跡象:
//業務線程
PooledByteBufAllocator allocator = PooledByteBufAllocator.DEFAULT;
ByteBuf buffer = allocator.buffer();
User user = new User();
//將對象直接序列化到ByteBuf
serialization.serialize(buffer, user);
//進入EventLoop
channel.writeAndFlush(buffer);
上面的代碼表面看沒什么問題。但實際上,PooledByteBufAllocator為了減少鎖競爭,池是通過thread local來實現的。也就是分配的時候會從本線程(這里就是業務線程)的thread local里取。而channel.writeAndFlush調用后,在將buffer寫到socket后,這個buffer將被回收到池里。回收的時候也是通過thread local找到對應的池,回收掉。這樣就有一個問題,分配的時候是在業務線程,也就是說從業務線程的thread local對應的池里分配的,而回收的時候是在IO線程。這兩個是不同的線程。池的作用完全喪失了,一個線程不斷地去分配,不斷地轉移到另外一個池。
3. ByteBuf擴展引起的問題
其實這個問題和上面一個問題是一樣的。但是比之前的問題更加隱晦,就在你彈冠相慶的時候給你致命一擊。在碰到上面一個問題后我們就在想,既然分配和回收都得在同一個線程里執行,那我們是不是可以啟動一個專門的線程來負責分配和回收呢?於是就有了下面的代碼:
import java.util.ArrayList; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.PooledByteBufAllocator; import io.netty.util.ReferenceCountUtil; public class Allocator { public static final ByteBufAllocator allocator = PooledByteBufAllocator.DEFAULT; private static final BlockingQueue<ByteBuf> bufferQueue = new ArrayBlockingQueue<ByteBuf>( 100); private static final BlockingQueue<ByteBuf> toCleanQueue = new LinkedBlockingQueue<ByteBuf>(); private static final int TO_CLEAN_SIZE = 50; private static final long CLEAN_PERIOD = 100; private static class AllocThread implements Runnable { @Override public void run() { long lastCleanTime = System.currentTimeMillis(); while (!Thread.currentThread().isInterrupted()) { try { ByteBuf buffer = allocator.buffer(); //確保是本線程釋放 buffer.retain(); bufferQueue.put(buffer); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } if (toCleanQueue.size() > TO_CLEAN_SIZE || System.currentTimeMillis() - lastCleanTime > CLEAN_PERIOD) { final List<ByteBuf> toClean = new ArrayList<ByteBuf>(toCleanQueue.size()); toCleanQueue.drainTo(toClean); for (ByteBuf buffer : toClean) { ReferenceCountUtil.release(buffer); } lastCleanTime = System.currentTimeMillis(); } } } } static { Thread thread = new Thread(new AllocThread(), "qclient-redis-allocator"); thread.setDaemon(true); thread.start(); } public static ByteBuf alloc() { try { return bufferQueue.take(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RedisRuntimeException("alloc interrupt"); } } public static void release(ByteBuf buf) { toCleanQueue.add(buf); } }
在業務線程里調用alloc,從queue里拿到專用的線程分配好的buffer。在將buffer寫出到socket之后再調用release回收:
//業務線程
ByteBuf buffer = Allocator.alloc();
//序列化
........
//寫出
ChannelPromise promise = channel.newPromise();
promise.addListener(new GenericFutureListener<Future<Void>>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
//buffer已經輸出,可以回收,交給專用線程回收
Allocator.release(buffer);
}
});
//進入EventLoop
channel.write(buffer, promise);
好像問題解決了。而且我們通過壓測發現性能果然有提升,內存占用也很正常,通過寫出各種不同大小的buffer進行了幾番測試結果都很OK。
不過你如果再提高每次寫出包的大小的時候,問題就出現了。在我這個版本的netty里,ByteBufAllocator.buffer()分配的buffer默認大小是256個字節,當你將對象往這個buffer里序列化的時候,如果超過了256個字節ByteBuf就會自動擴展,而對於PooledByteBuf來說,自動擴展是會去池里取一個,然后將舊的回收掉。而這一切都是在業務線程里進行的。意味着你使用專用的線程來做分配和回收功虧一簣。
上面三個問題就好像冥冥之中,有一雙看不見的手將你一步一步帶入深淵,最后讓你絕望。一個問題引出一個必然的解決方案,而這個解決方案看起來將問題解決了,但卻是將問題隱藏地更深。
如果說前面三個問題是因為你不熟悉Netty的新機制造成的,那么下面這個問題我覺得就是Netty本身的API設計不合理導致使用的人出現這個問題了。
4. 連接超時
在網絡應用中,超時往往是最后一道防線,或是最后一根稻草。我們不怕干脆利索的宕機,怕就怕要死不活。當碰到要死不活的應用的時候往往就是依靠超時了。
在使用Netty編寫客戶端的時候,我們一般會有類似這樣的代碼:
bootstrap.connect(address).await(1000, TimeUnit.MILLISECONDS)
向對端發起一個連接,超時等待1秒鍾。如果1秒鍾沒有連接上則重連或者做其他處理。而其實在bootstrap的選項里,還有這樣的一項:
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000);
如果這兩個值設置的不一致,在await的時候較短,而option里設置的較長就出問題了。這個時候你會發現connect里已經超時了,你以為連接失敗了,但實際上await超時Netty並不會幫你取消正在連接的鏈接。這個時候如果第2秒的時候連上了對端服務器,那么你剛才的判斷就失誤了。如果你根據connect(address).await(1000, TimeUnit.MILLISECONDS)來決定是否重連,很有可能你就建立了兩個連接,而且很有可能你的handler就在這兩個channel里共享起來了,這就有可能讓你產生:哎呀,Netty的handler不是在單線程里執行的這樣的假象。所以我的建議是,不要在await上設置超時,而總是使用option上的選項來設置。這個更准確些,超時了就是真的表示沒有連上。
5. 異步處理,流控先行
這個坑其實也不算坑,只是因為懶,該做的事情沒做。一般來講我們的業務如果比較小的時候我們用同步處理,等業務到一定規模的時候,一個優化手段就是異步化。異步化是提高吞吐量的一個很好的手段。但是,與異步相比,同步有天然的負反饋機制,也就是如果后端慢了,前面也會跟着慢起來,可以自動的調節。但是異步就不同了,異步就像決堤的大壩一樣,洪水是暢通無阻。如果這個時候沒有進行有效的限流措施就很容易把后端沖垮。如果一下子把后端沖垮倒也不是最壞的情況,就怕把后端沖的要死不活。這個時候,后端就會變得特別緩慢,如果這個時候前面的應用使用了一些無界的資源等,就有可能把自己弄死。那么現在要介紹的這個坑就是關於Netty里的ChannelOutboundBuffer這個東西的。這個buffer是用在netty向channel write數據的時候,有個buffer緩沖,這樣可以提高網絡的吞吐量(每個channel有一個這樣的buffer)。初始大小是32(32個元素,不是指字節),但是如果超過32就會翻倍,一直增長。大部分時候是沒有什么問題的,但是在碰到對端非常慢(對端慢指的是對端處理TCP包的速度變慢,比如對端負載特別高的時候就有可能是這個情況)的時候就有問題了,這個時候如果還是不斷地寫數據,這個buffer就會不斷地增長,最后就有可能出問題了(我們的情況是開始吃swap,最后進程被linux killer干掉了)。
為什么說這個地方是坑呢,因為大部分時候我們往一個channel寫數據會判斷channel是否active,但是往往忽略了這種慢的情況。
那這個問題怎么解決呢?其實ChannelOutboundBuffer雖然無界,但是可以給它配置一個高水位線和低水位線,當buffer的大小超過高水位線的時候對應channel的isWritable就會變成false,當buffer的大小低於低水位線的時候,isWritable就會變成true。所以應用應該判斷isWritable,如果是false就不要再寫數據了。高水位線和低水位線是字節數,默認高水位是64K,低水位是32K,我們可以根據我們的應用需要支持多少連接數和系統資源進行合理規划。
.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 64 * 1024)
.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 32 * 1024)
在使用一些開源的框架上還真是要熟悉人家的實現機制,然后才可以大膽的使用啊,不然被坑死都覺得自己很冤枉
Netty中的坑(下篇)
其實這篇應該叫Netty實踐,但是為了與前一篇名字保持一致,所以還是用一下坑這個名字吧。
Netty是高性能Java NIO網絡框架,在很多開源系統里都有她的身影,而在絕大多數互聯網公司所實施的服務化,以及最近流行的MicroService中,她都作為基礎中的基礎出現。
Netty的出現讓我們可以簡單容易地就可以使用NIO帶來的高性能網絡編程的潛力。她用一種統一的流水線方式組織我們的業務代碼,將底層網絡繁雜的細節隱藏起來,讓我們只需要關注業務代碼即可。並且用這種機制將不同的業務划分到不同的handler里,比如將編碼,連接管理,業務邏輯處理進行分開。Netty也力所能及的屏蔽了一些NIO bug,比如著名的epoll cpu 100% bug。而且,還提供了很多優化支持,比如使用buffer來提高網絡吞吐量。
但是,和所有的框架一樣,框架為我們屏蔽了底層細節,讓我們可以很快上手。但是,並不表示我們不需要對框架所屏蔽的那一層進行了解。本文所涉及的幾個地方就是Netty與底層網絡結合的幾個地方,看看我們使用的時候應該怎么處理,以及為什么要這么處理。
autoread
在Netty 4里我覺得一個很有用的功能是autoread。autoread是一個開關,如果打開的時候Netty就會幫我們注冊讀事件(這個需要對NIO有些基本的了解)。當注冊了讀事件后,如果網絡可讀,則Netty就會從channel讀取數據,然后我們的pipeline就會開始流動起來。那如果autoread關掉后,則Netty會不注冊讀事件,這樣即使是對端發送數據過來了也不會觸發讀時間,從而也不會從channel讀取到數據。那么這樣一個功能到底有什么作用呢?
它的作用就是更精確的速率控制。那么這句話是什么意思呢?比如我們現在在使用Netty開發一個應用,這個應用從網絡上發送過來的數據量非常大,大到有時我們都有點處理不過來了。而我們使用Netty開發應用往往是這樣的安排方式:Netty的Worker線程處理網絡事件,比如讀取和寫入,然后將讀取后的數據交給pipeline處理,比如經過反序列化等最后到業務層。到業務層的時候如果業務層有阻塞操作,比如數據庫IO等,可能還要將收到的數據交給另外一個線程池處理。因為我們絕對不能阻塞Worker線程,一旦阻塞就會影響網絡處理效率,因為這些Worker是所有網絡處理共享的,如果這里阻塞了,可能影響很多channel的網絡處理。
但是,如果把接到的數據交給另外一個線程池處理就又涉及另外一個問題:速率匹配。
比如現在網絡實在太忙了,接收到很多數據交給線程池。然后就出現兩種情況:
1. 由於開發的時候沒有考慮到,這個線程池使用了某些無界資源。比如很多人對ThreadPoolExecutor的幾個參數不是特別熟悉,就有可能用錯,最后導致資源無節制使用,整個系統crash掉。
//比如開始的時候沒有考慮到會有這么大量//這種方式線程數是無界的,那么有可能創建大量的線程對系統穩定性造成影響Executor executor = Executors.newCachedTheadPool(); executor.execute(requestWorker);//或者使用這個//這種queue是無界的,有可能會消耗太多內存,對系統穩定性造成影響Executor executor = Executors.newFixedThreadPool(8); executor.execute(requestWorker);
2. 第二種情況就是限制了資源使用,所以只好把最老的或最新的數據丟棄。
//線程池滿后,將最老的數據丟棄Executor executor = new ThreadPoolExecutor(8, 8, 1, TimeUnit.MINUTES, new ArrayBlockingQueue<Runnable>(1000), namedFactory, new ThreadPoolExecutor.DiscardOldestPolicy());
其實上面兩種情況,不管哪一種都不是太合理。不過在Netty 4里我們就有了更好的解決辦法了。如果我們的線程池暫時處理不過來,那么我們可以將autoread關閉,這樣Netty就不再從channel上讀取數據了。那么這樣造成的影響是什么呢?這樣socket在內核那一層的read buffer就會滿了。因為TCP默認就是帶flow control的,read buffer變小之后,向對端發送ACK的時候,就會降低窗口大小,直至變成0,這樣對端就會自動的降低發送數據的速率了。等到我們又可以處理數據了,我們就可以將autoread又打開這樣數據又源源不斷的到來了。
這樣整個系統就通過TCP的這個負反饋機制,和諧的運行着。那么autoread涉及的網絡知識就是,發送端會根據對端ACK時候所攜帶的advertises window來調整自己發送的數據量。而ACK里的這個window的大小又跟接收端的read buffer有關系。而不注冊讀事件后,read buffer里的數據沒有被消費掉,就會達到控制發送端速度的目的。
不過設計關閉和打開autoread的策略也要注意,不要設計成我們不能處理任何數據了就立即關閉autoread,而我們開始能處理了就立即打開autoread。這個地方應該留一個緩沖地帶。也就是如果現在排隊的數據達到我們預設置的一個高水位線的時候我們關閉autoread,而低於一個低水位線的時候才打開autoread。不這么弄的話,有可能就會導致我們的autoread頻繁打開和關閉。autoread的每次調整都會涉及系統調用,對性能是有影響的。類似下面這樣一個代碼,在將任務提交到線程池之前,判斷一下現在的排隊量(注:本文的所有數字純為演示作用,所有線程池,隊列等大小數據要根據實際業務場景仔細設計和考量)。
int highReadWaterMarker = 900;int lowReadWaterMarker = 600; ThreadPoolExecutor executor = new ThreadPoolExecutor(8, 8, 1, TimeUnit.MINUTES, new ArrayBlockingQueue<Runnable>(1000), namedFactory, new ThreadPoolExecutor.DiscardOldestPolicy());int queued = executor.getQueue().size();if(queued > highReadWaterMarker){ channel.config().setAutoRead(false); }if(queued < lowReadWaterMarker){ channel.config().setAutoRead(true); }
但是使用autoread也要注意一件事情。autoread如果關閉后,對端發送FIN的時候,接收端應用層也是感知不到的。這樣帶來一個后果就是對端發送了FIN,然后內核將這個socket的狀態變成CLOSE_WAIT。但是因為應用層感知不到,所以應用層一直沒有調用close。這樣的socket就會長期處於CLOSE_WAIT狀態。特別是一些使用連接池的應用,如果將連接歸還給連接池后,一定要記着autoread一定是打開的。不然就會有大量的連接處於CLOSE_WAIT狀態。
其實所有異步的場合都存在速率匹配的問題,而同步往往不存在這樣的問題,因為同步本身就是帶負反饋的。
isWritable
isWritable其實在上一篇文章已經介紹了一點,不過這里我想結合網絡層再啰嗦一下。上面我們講的autoread一般是接收端的事情,而發送端也有速率控制的問題。Netty為了提高網絡的吞吐量,在業務層與socket之間又增加了一個ChannelOutboundBuffer。在我們調用channel.write的時候,所有寫出的數據其實並沒有寫到socket,而是先寫到ChannelOutboundBuffer。當調用channel.flush的時候才真正的向socket寫出。因為這中間有一個buffer,就存在速率匹配了,而且這個buffer還是無界的。也就是你如果沒有控制channel.write的速度,會有大量的數據在這個buffer里堆積,而且如果碰到socket又『寫不出』數據的時候,很有可能的結果就是資源耗盡。而且這里讓這個事情更嚴重的是ChannelOutboundBuffer很多時候我們放到里面的是DirectByteBuffer,什么意思呢,意思是這些內存是放在GC Heap之外。如果我們僅僅是監控GC的話還監控不出來這個隱患。
那么說到這里,socket什么時候會寫不出數據呢?在上一節我們了解到接收端有一個read buffer,其實發送端也有一個send buffer。我們調用socket的write的時候其實是向這個send buffer寫數據,如果寫進去了就表示成功了(所以這里千萬不能將socket.write調用成功理解成數據已經到達接收端了),如果send buffer滿了,對於同步socket來講,write就會阻塞直到超時或者send buffer又有空間(這么一看,其實我們可以將同步的socket.write理解為半同步嘛)。對於異步來講這里是立即返回的。
那么進入send buffer的數據什么時候會減少呢?是發送到網絡的數據就會從send buffer里去掉么?也不是這個樣子的。還記得TCP有重傳機制么,如果發送到網絡的數據都從send buffer刪除了,那么這個數據沒有得到確認TCP怎么重傳呢?所以send buffer的數據是等到接收端回復ACK確認后才刪除。那么,如果接收端非常慢,比如CPU占用已經到100%了,而load也非常高的時候,很有可能來不及處理網絡事件,這個時候send buffer就有可能會堆滿。這就導致socket寫不出數據了。而發送端的應用層在發送數據的時候往往判斷socket是不是有效的(是否已經斷開),而忽略了是否可寫,這個時候有可能就還一個勁的寫數據,最后導致ChannelOutboundBuffer膨脹,造成系統不穩定。
所以,Netty已經為我們考慮了這點。channel有一個isWritable屬性,可以來控制ChannelOutboundBuffer,不讓其無限制膨脹。至於isWritable的實現機制可以參考前一篇。
序列化
所有講TCP的書都會有這么一個介紹:TCP provides a connection-oriented, reliable, byte stream service。前面兩個這里就不關心了,那么這個byte stream到底是什么意思呢?我們在發送端發送數據的時候,對於應用層來說我們發送的是一個個對象,然后序列化成一個個字節數組,但無論怎樣,我們發送的是一個個『包』。每個都是獨立的。那么接收端是不是也像發送端一樣,接收到一個個獨立的『包』呢?很遺憾,不是的。這就是byte stream的意思。接收端沒有『包』的概念了。
這對於應用層編碼的人員來說可能有點困惑。比如我使用Netty開發,我的handler的channelRead這次明明傳遞給我的是一個ByteBuf啊,是一個『獨立』的包啊,如果是byte stream的話難道不應該傳遞我一個Stream么。但是這個ByteBuf和發送端的ByteBuf一點關系都沒有。比如:
public class Decorder extends ChannelInboundHandlerAdapter{ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //這里的msg和發送端channel.write(msg)時候的msg沒有任何關系 } }
這個ByteBuf可能包含發送端多個ByteBuf,也可能只包含發送端半個ByteBuf。但是別擔心,TCP的可靠性會確保接收端的順序和發送端的順序是一致的。這樣的byte stream協議對我們的反序列化工作就帶來了一些挑戰。在反序列化的時候我們要時刻記着這一點。對於半個ByteBuf我們按照設計的協議如果解不出一個完整對象,我們要留着,和下次收到的ByteBuf拼湊在一起再次解析,而收到的多個ByteBuf我們要根據協議解析出多個完整對象,而很有可能最后一個也是不完整的。不過幸運的是,我們有了Netty。Netty為我們已經提供了很多種協議解析的方式,並且對於這種半包粘包也已經有考慮,我們可以參考ByteToMessageDecoder以及它的一連串子類來實現自己的反序列化機制。而在反序列化的時候我們可能經常要取ByteBuf中的一個片段,這個時候建議使用ByteBuf的readSlice方法而不是使用copy。
另外,Netty還提供了兩個ByteBuf的流封裝:ByteBufInputStream, ByteBufOutputStream。比如我們在使用一些序列化工具,比如Hessian之類的時候,我們往往需要傳遞一個InputStream(反序列化),OutputStream(序列化)到這些工具。而很多協議的實現都涉及大量的內存copy。比如對於反序列化,先將ByteBuf里的數據讀取到byte[],然后包裝成ByteArrayInputStream,而序列化的時候是先將對象序列化成ByteArrayOutputStream再copy到ByteBuf。而使用ByteBufInputStream和ByteBufOutputStream就不再有這樣的內存拷貝了,大大節約了內存開銷。
另外,因為socket.write和socket.read都需要一個direct byte buffer(即使你傳入的是一個heap byte buffer,socket內部也會將內容copy到direct byte buffer)。如果我們直接使用ByteBufInputStream和ByteBufOutputStream封裝的direct byte buffer再加上Netty 4的內存池,那么內存將更有效的使用。這里提一個問題:為什么socket.read和socket.write都需要direct byte buffer呢?heap byte buffer不行么?
總結起來,對於序列化和反序列化來講就是兩條:1 減少內存拷貝 2 處理好TCP的粘包和半包問題
后記
作為一個應用層程序員,往往是幸福的。因為我們有豐富的框架和工具為我們屏蔽下層的細節,這樣我們可以更容易的解決很多業務問題。但是目前程序設計並沒有發展到不需要了解所有下層的知識就可以寫出更有效率的程序,所以我們在使用一個框架的時候最好要對它所屏蔽和所依賴的知識進行一些了解,這樣在碰到一些問題的時候我們可以根據這些理論知識去分析原因。這就是理論和實踐的相結合。