
【ChannelPromise作用:可以設置success或failure 是為了通知ChannelFutureListener】
Netty的數據處理API通過兩個組件暴露——abstract class ByteBuf和interface ByteBufHolder。
下面是一些ByteBuf API的優點:
它可以被用戶自定義的緩沖區類型擴展;
通過內置的復合緩沖區類型實現了透明的零拷貝;
容量可以按需增長(類似於JDK的StringBuilder);
在讀和寫這兩種模式之間切換不需要調用ByteBuffer的flip()方法;
讀和寫使用了不同的索引;
支持方法的鏈式調用;
支持引用計數;
支持池化。
使用不同的讀索引和寫索引來控制數據訪問;
readerIndex達到和writerIndex
使用內存的不同方式——基於字節數組和直接緩沖區;
通過CompositeByteBuf生成多個ByteBuf的聚合視圖;
數據訪問方法——搜索、切片以及復制;
隨機訪問索引 【0到capacity() - 1】
順序訪問索引
可丟棄字節 【discardReadBytes() clear()改變index值】
可讀字節【readBytes(ByteBuf dest) 】
可寫字節【writeBytes(ByteBuf dest);】
索引管理【markReaderIndex()、markWriterIndex()、resetWriterIndex()和resetReaderIndex( readerIndex(int)或者writerIndex(int) 】
查找操作【buf.indexOf(),forEachByte(ByteBufProcessor.FIND_NUL), int nullIndex = buf.forEachByte(ByteBufProcessor.FIND_NUL);int rIndex = buf.forEachByte(ByteBufProcessor.FIND_CR);】
派生緩沖區【 返回新的buf,都具有 readIndex writeIndex markIndex
buf.duplicate();
ByteBuf rep = buf.copy();//創建副本
buf.slice();//操作buf分段
buf.slice(0, 5);//操作buf分段】
讀、寫、獲取和設置API;
讀/寫操作【get()和set()操作,從給定的索引開始,並且保持索引不變;read()和write()操作,從給定的索引開始,並且會根據已經訪問過的字節數對索引進行調整】
ByteBufAllocator池化
ByteBufAllocator pool = new PooledByteBufAllocator();//提高性能減少碎片,高效分配算法
ByteBufAllocator unpool = new UnpooledByteBufAllocator(true);//一直新建
引用計數
ByteBufAllocator allocator = ctx.channel().alloc();
ByteBuf directBuf = allocator.directBuffer();
if(directBuf.refCnt() == 1){//當引用技術為1時釋放對象
directBuf.release();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws InterruptedException {
logger.info("channelRead start");
ByteBuf buf = (ByteBuf) msg;
if (buf.hasArray()) {//檢查buf是否支持一個數組
byte[] array = buf.array();
//第一個偏移量
int off = buf.arrayOffset() + buf.readerIndex();
//獲取可讀取字節
int len = buf.readableBytes();
byte[] buffer = new byte[len];
buf.getBytes(off, buffer);
CompositeByteBuf compositeByteBuf = Unpooled.compositeBuffer();
ByteBuf header = (ByteBuf) msg;
ByteBuf body = (ByteBuf) msg;
compositeByteBuf.addComponent(header);
compositeByteBuf.addComponent(body);
compositeByteBuf.removeComponent(0);
for (ByteBuf bufer : compositeByteBuf) {
System.out.println(bufer.toString());
}
int comLen = compositeByteBuf.readableBytes();
for (int i = 0; i < buf.capacity(); i++) {
System.out.println((char) buf.getByte(i));
//讀完成后進行丟棄
buf.discardReadBytes();
//或者調用clear
buf.clear();
}
//標記索引
buf.readerIndex(2);
buf.writeByte(2);
//重置索引
buf.markReaderIndex();
buf.markWriterIndex();
buf.resetReaderIndex();
buf.resetWriterIndex();
//查找
buf.indexOf(0, 5, (byte) 0);
int nullIndex = buf.forEachByte(ByteBufProcessor.FIND_NUL);
int rIndex = buf.forEachByte(ByteBufProcessor.FIND_CR);
//派生緩沖區 返回新的buf,都具有 readIndex writeIndex markIndex
buf.duplicate();
ByteBuf rep = buf.copy();//創建副本
buf.slice();//操作buf分段
buf.slice(0, 5);//操作buf分段
//==========數據訪問方法——搜索、切片以及復制;
Charset charset = Charset.forName("UTF-8");
ByteBuf buf1 = Unpooled.copiedBuffer("Netty in Action rocks!", charset); //← -- 創建一個用於保存給定字符串的字節的ByteBuf
ByteBuf sliced = buf1.slice(0, 15); //← -- 創建該ByteBuf 從索引0 開始到索引15結束的一個新切片
System.out.println(sliced.toString(charset)); // ← -- 將打印“Netty in Action”
buf1.setByte(0, (byte) 'J'); //← -- 更新索引0 處的字節
assert buf1.getByte(0) == sliced.getByte(0); //← -- 將會成功,因為數據是共享的,對其中一個所做的更改對另外一個也是可見的
Charset utf8 = Charset.forName("UTF-8");
ByteBuf buf2 = Unpooled.copiedBuffer("Netty in Action rocks!", utf8); // ← -- 創建ByteBuf 以保存所提供的字符串的字節
ByteBuf copy = buf2.copy(0, 15);// ← -- 創建該ByteBuf 從索引0 開始到索引15結束的分段的副本
System.out.println(copy.toString(utf8));// ← -- 將打印“Netty in Action”
buf2.setByte(0, (byte) 'J');// ← -- 更新索引0 處的字節
assert buf2.getByte(0) != copy.getByte(0);// ← -- 將會成功,因為數據不是共享的
Unpooled.unmodifiableBuffer(buf);
buf.order();
buf.readSlice(1);
//使用不同的讀索引和寫索引來控制數據訪問;
//讀寫操作 get/set不改變索引位置 read/write改變索引(readIndex/writeIndex)位置
Charset u8 = Charset.forName("UTF-8");
ByteBuf getSetBuf = Unpooled.copiedBuffer("Netty in Action rocks!", u8); // 創建一個新的ByteBuf以保存給定字符串的字節
System.out.println((char) getSetBuf.getByte(0));// 打印第一個字符'N'
int readerIndex = getSetBuf.readerIndex(); // 存儲當前的readerIndex 和writerIndex
int writerIndex = getSetBuf.writerIndex();
getSetBuf.setByte(0, (byte) 'B'); // 將索引0 處的字節更新為字符'B'
System.out.println((char) getSetBuf.getByte(0)); // 打印第一個字符,現在是'B'
assert readerIndex == getSetBuf.readerIndex();// 將會成功,因為這些操作並不會修改相應的索引
assert writerIndex == getSetBuf.writerIndex();
ByteBuf readWriteBuf = Unpooled.copiedBuffer("Netty in Action rocks!", u8); // 創建一個新的ByteBuf以保存給定字符串的字節
System.out.println((char) readWriteBuf.readByte());// 打印第一個字符'N'
System.out.println((boolean) readWriteBuf.readBoolean());// 讀取當前boolean值,並將readIndex+1
readWriteBuf.writeByte('F');// 將字符F追加到緩沖區中,並將writeIndex+1
int reIndex = readWriteBuf.readerIndex(); // 存儲當前的readerIndex 和writerIndex
int wrIndex = readWriteBuf.writerIndex();
readWriteBuf.setByte(0, (byte) 'B'); // 將索引0 處的字節更新為字符'B'
System.out.println((char) readWriteBuf.getByte(0)); // 打印第一個字符,現在是'B'
assert reIndex == readWriteBuf.readerIndex();// 將會成功,因為這些操作並不會修改相應的索引
assert wrIndex == readWriteBuf.writerIndex();
buf.isReadable();//至少有一個字符可讀,返回true
buf.isWritable();//至少有一個字節可被寫入,返回true
int readableByte = buf.readableBytes();//返回可被讀取的字節數
int writableByte = buf.writableBytes();//返回可被寫入的字節數
int capacity = buf.capacity();//返回可容納的字節數
buf.maxCapacity();//返回可容納的最大字節數
buf.hasArray();//如果buf由一個字節數組支撐,返回true
buf.array();//將buf轉換為字節數組
//除了數據外,還有一些其他的屬性,如http的狀態碼,cookie等
ByteBufHolder byteBufHolder = new DefaultLastHttpContent();
ByteBuf httpContent = byteBufHolder.content();//返回一個http格式的ByteBuf
ByteBufHolder copyBufHolder = byteBufHolder.copy();//深拷貝,不共享
ByteBufHolder duplicateBufHolder = byteBufHolder.duplicate();//淺拷貝,共享
//ByteBufAllocator ByteBuf分配
// buffer()基於堆或直接內存的buf
//ioBuffer() 返回一個iobuf
//heapBuffer 堆buf
//directBuffer 直接buf
//compositeBuffer compositeHeapBuffer compositeDirectBuffer 復合buf
ByteBufAllocator ctxAllocator = ctx.alloc();
ByteBufAllocator channelAllocator = ctx.channel().alloc();
ctxAllocator.buffer();
ctxAllocator.ioBuffer();
ctxAllocator.compositeBuffer();
ctxAllocator.heapBuffer();
ctxAllocator.directBuffer();
ByteBufAllocator pool = new PooledByteBufAllocator();//提高性能減少碎片,高效分配算法
ByteBufAllocator unpool = new UnpooledByteBufAllocator(true);//一直新建
ctx.writeAndFlush(new byte[10]);
ctx.writeAndFlush(Unpooled.copiedBuffer(new byte[10]));//writeAndFlush參數是Object,使用非池化技術轉為buf提升效率
//工具類ByteBufUtil
ByteBufUtil.hexDump(buf);//可對buf進行轉換
ByteBufUtil.hexDump(new byte[9999]);//可對字節進行轉換
//引用計數:跟蹤特定對象的引用計數
ByteBufAllocator allocator = ctx.channel().alloc();
ByteBuf directBuf = allocator.directBuffer();
if(directBuf.refCnt() == 1){//當引用技術為1時釋放對象
directBuf.release();
}
if (buf.readableBytes() <= 0) {
ReferenceCountUtil.safeRelease(msg);
return;
}
byte[] msgContent = new byte[buf.readableBytes()];
buf.readBytes(msgContent);
logger.info("recv from client:length: {},toHexString: {}\n", buf.readableBytes(), HexStringUtils.toHexString(buf.array()));
if (buf.getByte(0) == 0x7e && buf.getByte(buf.readableBytes() - 1) == 0x7e) {}
if(msgContent[0] == 0x7e && msgContent[msgContent.length-1]==0x7e){}
}
}
//通常如果集成ChannelInboundHandlerAdapter時,復寫channelRead(),需要進行手動的消息釋放
//消息消費完成后自動釋放
private void release(Object msg) {
try {
ReferenceCountUtil.release(msg);
} catch (Exception e) {
e.printStackTrace();
}
}
===================================================
//在SimpleChannelInboundHandler中的channelRead0()方法中自動添加了釋放消息的方法
//SimpleChannelInboundHandler<I> extends ChannelInboundHandlerAdapter
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
boolean release = true;
try {
if (acceptInboundMessage(msg)) {
@SuppressWarnings("unchecked")
I imsg = (I) msg;
channelRead0(ctx, imsg);
} else {
release = false;
ctx.fireChannelRead(msg);
}
} finally {
if (autoRelease && release) {
ReferenceCountUtil.release(msg);
}
}
}
