Java NIO系列教程(三) Channel之Socket通道


目錄:

Java NIO系列教程(二) Channel

Java NIO系列教程(三) Channel之Socket通道

 

 

在《Java NIO系列教程(二) Channel》介紹了FileChannel,本章節介紹socket通道類。

一、Socket通道

  新的socket通道類可以運行非阻塞模式並且是可選擇的。這兩個性能可以激活大程序(如網絡服務器和中間件組件)巨大的可伸縮性和靈活性。本節中我們會看到,再也沒有為每個socket連接使用一個線程的必要了,也避免了管理大量線程所需的上下文交換總開銷。借助新的NIO類,一個或幾個線程就可以管理成百上千的活動socket連接了並且只有很少甚至可能沒有性能損失。所有的socket通道類(DatagramChannel、SocketChannel和ServerSocketChannel)都繼承了位於java.nio.channels.spi包中的AbstractSelectableChannel。這意味着我們可以用一個Selector對象來執行socket通道的就緒選擇(readiness selection)。

  請注意DatagramChannel和SocketChannel實現定義讀和寫功能的接口而ServerSocketChannel不實現。ServerSocketChannel負責監聽傳入的連接和創建新的SocketChannel對象,它本身從不傳輸數據

  在我們具體討論每一種socket通道前,您應該了解socket和socket通道之間的關系。之前的章節中有寫道,通道是一個連接I/O服務導管並提供與該服務交互的方法。就某個socket而言,它不會再次實現與之對應的socket通道類中的socket協議API,而java.net中已經存在的socket通道都可以被大多數協議操作重復使用。

  全部socket通道類(DatagramChannel、SocketChannel和ServerSocketChannel)在被實例化時都會創建一個對等socket對象。這些是我們所熟悉的來自java.net的類(Socket、ServerSocket和DatagramSocket),它們已經被更新以識別通道。對等socket可以通過調用socket( )方法從一個通道上獲取。此外,這三個java.net類現在都有getChannel( )方法。

  Socket通道將與通信協議相關的操作委托給相應的socket對象。socket的方法看起來好像在通道類中重復了一遍,但實際上通道類上的方法會有一些新的或者不同的行為

  要把一個socket通道置於非阻塞模式,我們要依靠所有socket通道類的公有超級類:SelectableChannel。就緒選擇(readiness selection)是一種可以用來查詢通道的機制,該查詢可以判斷通道是否准備好執行一個目標操作,如讀或寫。非阻塞I/O和可選擇性是緊密相連的,那也正是管理阻塞模式的API代碼要在SelectableChannel超級類中定義的原因。

  設置或重新設置一個通道的阻塞模式是很簡單的,只要調用configureBlocking( )方法即可,傳遞參數值為true則設為阻塞模式,參數值為false值設為非阻塞模式。真的,就這么簡單!您可以通過調用isBlocking( )方法來判斷某個socket通道當前處於哪種模式。

AbstractSelectableChannel.java中實現的configureBlocking()方法如下:

    public final SelectableChannel configureBlocking(boolean block)
        throws IOException
    {
        synchronized (regLock) {
            if (!isOpen())
                throw new ClosedChannelException();
            if (blocking == block)
                return this;
            if (block && haveValidKeys())
                throw new IllegalBlockingModeException();
            implConfigureBlocking(block);
            blocking = block;
        }
        return this;
    }

  非阻塞socket通常被認為是服務端使用的,因為它們使同時管理很多socket通道變得更容易。但是,在客戶端使用一個或幾個非阻塞模式的socket通道也是有益處的,例如,借助非阻塞socket通道,GUI程序可以專注於用戶請求並且同時維護與一個或多個服務器的會話。在很多程序上,非阻塞模式都是有用的。

  偶爾地,我們也會需要防止socket通道的阻塞模式被更改。API中有一個blockingLock( )方法,該方法會返回一個非透明的對象引用。返回的對象是通道實現修改阻塞模式時內部使用的。只有擁有此對象的鎖的線程才能更改通道的阻塞模式。

下面分別介紹這3個通道。

二、 ServerSocketChannel

讓我們從最簡單的ServerSocketChannel來開始對socket通道類的討論。以下是ServerSocketChannel的完整API:

public abstract class ServerSocketChannel extends AbstractSelectableChannel
  {
      public static ServerSocketChannel open() throws IOException;
      public abstract ServerSocket socket();
      public abstract ServerSocket accept()throws IOException;
      public final int validOps();
  }  

ServerSocketChannel是一個基於通道的socket監聽器。它同我們所熟悉的java.net.ServerSocket執行相同的基本任務,不過它增加了通道語義,因此能夠在非阻塞模式下運行

由於ServerSocketChannel沒有bind()方法,因此有必要取出對等的socket並使用它來綁定到一個端口以開始監聽連接。我們也是使用對等ServerSocket的API來根據需要設置其他的socket選項。

同它的對等體java.net.ServerSocket一樣,ServerSocketChannel也有accept( )方法。一旦您創建了一個ServerSocketChannel並用對等socket綁定了它,然后您就可以在其中一個上調用accept()。如果您選擇在ServerSocket上調用accept( )方法,那么它會同任何其他的ServerSocket表現一樣的行為:總是阻塞並返回一個java.net.Socket對象。如果您選擇在ServerSocketChannel上調用accept( )方法則會返回SocketChannel類型的對象,返回的對象能夠在非阻塞模式下運行

換句話說:

  • ServerSocketChannel的accept()方法會返回SocketChannel類型對象,SocketChannel可以在非阻塞模式下運行。
  • 其它Socket的accept()方法會阻塞返回一個Socket對象。

如果ServerSocketChannel以非阻塞模式被調用,當沒有傳入連接在等待時,ServerSocketChannel.accept( )會立即返回null。正是這種檢查連接而不阻塞的能力實現了可伸縮性並降低了復雜性。可選擇性也因此得到實現。我們可以使用一個選擇器實例來注冊一個ServerSocketChannel對象以實現新連接到達時自動通知的功能。以下代碼演示了如何使用一個非阻塞的accept( )方法:

package com.dxz.springsession.nio.demo2;

import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.net.InetSocketAddress;

public class ChannelAccept {
    public static final String GREETING = "Hello I must be going.\r\n";

    public static void main(String[] argv) throws Exception {
        int port = 1234; // default
        if (argv.length > 0) {
            port = Integer.parseInt(argv[0]);
        }
        ByteBuffer buffer = ByteBuffer.wrap(GREETING.getBytes());
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.socket().bind(new InetSocketAddress(port));
        ssc.configureBlocking(false);
        while (true) {
            System.out.println("Waiting for connections");
            SocketChannel sc = ssc.accept();
            if (sc == null) {
                System.out.println("null");
                Thread.sleep(2000);
            } else {
                System.out.println("Incoming connection from: " + sc.socket().getRemoteSocketAddress());
                buffer.rewind();
                sc.write(buffer);
                sc.close();
            }
        }
    }
}

日志:

2.1、打開 ServerSocketChannel

通過調用 ServerSocketChannel.open() 方法來打開ServerSocketChannel.如:

ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
2.2、關閉 ServerSocketChannel

通過調用ServerSocketChannel.close() 方法來關閉ServerSocketChannel. 如:

serverSocketChannel.close();
2.3、監聽新進來的連接

通過 ServerSocketChannel.accept() 方法監聽新進來的連接。當 accept()方法返回的時候,它返回一個包含新進來的連接的 SocketChannel。因此, accept()方法會一直阻塞到有新連接到達。

通常不會僅僅只監聽一個連接,在while循環中調用 accept()方法. 如下面的例子:

        while(true){
            SocketChannel socketChannel = serverSocketChannel.accept();
            //...
        }
2.4、阻塞模式

會在SocketChannel sc = ssc.accept();這里阻塞住進程。

2.5、非阻塞模式

ServerSocketChannel可以設置成非阻塞模式。在非阻塞模式下,accept() 方法會立刻返回,如果還沒有新進來的連接,返回的將是null。 因此,需要檢查返回的SocketChannel是否是null.如:

        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.socket().bind(new InetSocketAddress(port));
        ssc.configureBlocking(false);
        while (true) {
            System.out.println("Waiting for connections");
            SocketChannel sc = ssc.accept();
            if(sc != null) {
                
            }

 

三、SocketChannel

下面開始學習SocketChannel,它是使用最多的socket通道類:

Java NIO中的SocketChannel是一個連接到TCP網絡套接字的通道。可以通過以下2種方式創建SocketChannel:

  • 打開一個SocketChannel並連接到互聯網上的某台服務器。
  • 一個新連接到達ServerSocketChannel時,會創建一個SocketChannel。
3.1、打開 SocketChannel

下面是SocketChannel的打開方式:

SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("http://jenkov.com", 80));
3.2、關閉 SocketChannel

當用完SocketChannel之后調用SocketChannel.close()關閉SocketChannel:

socketChannel.close();
3.3、從 SocketChannel 讀取數據

要從SocketChannel中讀取數據,調用一個read()的方法之一。以下是例子:

ByteBuffer buf = ByteBuffer.allocate(48);
int bytesRead = socketChannel.read(buf);

  首先,分配一個Buffer。從SocketChannel讀取到的數據將會放到這個Buffer中。然后,調用SocketChannel.read()。該方法將數據從SocketChannel 讀到Buffer中。read()方法返回的int值表示讀了多少字節進Buffer里。如果返回的是-1,表示已經讀到了流的末尾(連接關閉了)。

3.4、寫入 SocketChannel

寫數據到SocketChannel用的是SocketChannel.write()方法,該方法以一個Buffer作為參數。示例如下:

String newData = "New String to write to file..." + System.currentTimeMillis();
 
ByteBuffer buf = ByteBuffer.allocate(48);
buf.clear();
buf.put(newData.getBytes());
 
buf.flip();
 
while(buf.hasRemaining()) {
    channel.write(buf);
}

  注意SocketChannel.write()方法的調用是在一個while循環中的。Write()方法無法保證能寫多少字節到SocketChannel。所以,我們重復調用write()直到Buffer沒有要寫的字節為止。

3.5、非阻塞模式

可以設置 SocketChannel 為非阻塞模式(non-blocking mode).設置之后,就可以在異步模式下調用connect(), read() 和write()了。

3.5.1、connect()

如果SocketChannel在非阻塞模式下,此時調用connect(),該方法可能在連接建立之前就返回了。為了確定連接是否建立,可以調用finishConnect()的方法。像這樣:

socketChannel.configureBlocking(false);
socketChannel.connect(new InetSocketAddress("http://jenkov.com", 80));
 
while(! socketChannel.finishConnect() ){
    //wait, or do something else...
}
 
3.5.2、write()

非阻塞模式下,write()方法在尚未寫出任何內容時可能就返回了。所以需要在循環中調用write()。前面已經有例子了,這里就不贅述了。

3.5.3、read()

非阻塞模式下,read()方法在尚未讀取到任何數據時可能就返回了。所以需要關注它的int返回值,它會告訴你讀取了多少字節。

3.6、非阻塞模式與選擇器

非阻塞模式與選擇器搭配會工作的更好,通過將一或多個SocketChannel注冊到Selector,可以詢問選擇器哪個通道已經准備好了讀取,寫入等。Selector與SocketChannel的搭配使用會在后面詳講。

 

四、DatagramChannel

最后一個socket通道是DatagramChannel。正如SocketChannel對應Socket,ServerSocketChannel對應ServerSocket,每一個DatagramChannel對象也有一個關聯的DatagramSocket對象。不過原命名模式在此並未適用:“DatagramSocketChannel”顯得有點笨拙,因此采用了簡潔的“DatagramChannel”名稱。

正如SocketChannel模擬連接導向的流協議(如TCP/IP),DatagramChannel則模擬包導向的無連接協議(如UDP/IP)。

DatagramChannel是無連接的。每個數據報(datagram)都是一個自包含的實體,擁有它自己的目的地址及不依賴其他數據報的數據負載。與面向流的的socket不同,DatagramChannel可以發送單獨的數據報給不同的目的地址。同樣,DatagramChannel對象也可以接收來自任意地址的數據包。每個到達的數據報都含有關於它來自何處的信息(源地址)。

4.1、打開 DatagramChannel

下面是 DatagramChannel 的打開方式:

    DatagramChannel channel = DatagramChannel.open();
    channel.socket().bind(new InetSocketAddress(9999));
 

這個例子打開的 DatagramChannel可以在UDP端口9999上接收數據包。

4.2、接收數據

通過receive()方法從DatagramChannel接收數據,如:

    ByteBuffer buf = ByteBuffer.allocate(48);
    buf.clear();
    channel.receive(buf);

receive()方法會將接收到的數據包內容復制到指定的Buffer. 如果Buffer容不下收到的數據,多出的數據將被丟棄。

4.3、發送數據

通過send()方法從DatagramChannel發送數據,如:

String newData = "New String to write to file..." + System.currentTimeMillis();
     
    ByteBuffer buf = ByteBuffer.allocate(48);
    buf.clear();
    buf.put(newData.getBytes());
    buf.flip();
     
    int bytesSent = channel.send(buf, new   InetSocketAddress("jenkov.com", 80));

這個例子發送一串字符到”jenkov.com”服務器的UDP端口80。 因為服務端並沒有監控這個端口,所以什么也不會發生。也不會通知你發出的數據包是否已收到,因為UDP在數據傳送方面沒有任何保證。

4.4、連接到特定的地址

可以將DatagramChannel“連接”到網絡中的特定地址的。由於UDP是無連接的,連接到特定地址並不會像TCP通道那樣創建一個真正的連接。而是鎖住DatagramChannel ,讓其只能從特定地址收發數據。

這里有個例子:

channel.connect(new InetSocketAddress("jenkov.com", 80));

當連接后,也可以使用read()和write()方法,就像在用傳統的通道一樣。只是在數據傳送方面沒有任何保證。這里有幾個例子:

    int bytesRead = channel.read(buf);
    int bytesWritten = channel.write(but);

 完整示例:

package com.dxz.springsession.nio.demo3;

import java.nio.channels.*;
import java.nio.charset.*;
import java.net.*;
import java.io.*;
import java.util.*;
import java.nio.*;

public class DatagramChannelServerDemo {
    // UDP協議服務端
    private int port = 9975;
    DatagramChannel channel;
    private Charset charset = Charset.forName("UTF-8");
    private Selector selector = null;

    public DatagramChannelServerDemo() throws IOException {
        try {
            selector = Selector.open();
            channel = DatagramChannel.open();
        } catch (Exception e) {
            selector = null;
            channel = null;
            System.out.println("超時");
        }
        System.out.println("服務器啟動");
    }

    /* 編碼過程 */
    public ByteBuffer encode(String str) {
        return charset.encode(str);
    }

    /* 解碼過程 */
    public String decode(ByteBuffer bb) {
        return charset.decode(bb).toString();
    }

    /* 服務器服務方法 */
    public void service() throws IOException {
        if (channel == null || selector == null)
            return;
        channel.configureBlocking(false);
        channel.socket().bind(new InetSocketAddress(port));
        // channel.write(ByteBuffer.wrap(new String("aaaa").getBytes()));
        channel.register(selector, SelectionKey.OP_READ);
        /** 外循環,已經發生了SelectionKey數目 */
        while (selector.select() > 0) {
            System.out.println("有新channel加入");
            /* 得到已經被捕獲了的SelectionKey的集合 */
            Iterator iterator = selector.selectedKeys().iterator();
            while (iterator.hasNext()) {
                SelectionKey key = null;
                try {
                    key = (SelectionKey) iterator.next();
                    iterator.remove();

                    if (key.isReadable()) {
                        reveice(key);
                    }
                    if (key.isWritable()) {
                        // send(key);
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                    try {
                        if (key != null) {
                            key.cancel();
                            key.channel().close();
                        }
                    } catch (ClosedChannelException cex) {
                        e.printStackTrace();
                    }
                }
            }
            /* 內循環完 */
        }
        /* 外循環完 */
    }

    /*
     * 接收 用receive()讀IO 作為服務端一般不需要調用connect(),如果未調用<span style=
     * "font-family: Arial, Helvetica, sans-serif;">connect()時調</span><span
     * style="font-family: Arial, Helvetica, sans-serif;"
     * >用read()\write()讀寫,會報java.nio.channels</span> .NotYetConnectedException
     * 只有調用connect()之后,才能使用read和write.
     */
    synchronized public void reveice(SelectionKey key) throws IOException {
        if (key == null)
            return;
        // ***用channel.receive()獲取客戶端消息***//
        // :接收時需要考慮字節長度
        DatagramChannel sc = (DatagramChannel) key.channel();
        String content = "";
        // create buffer with capacity of 48 bytes
        ByteBuffer buf = ByteBuffer.allocate(1024);// java里一個(utf-8)中文3字節,gbk中文占2個字節
        buf.clear();
        SocketAddress address = sc.receive(buf); // read into buffer. 返回客戶端的地址信息
        String clientAddress = address.toString().replace("/", "").split(":")[0];
        String clientPost = address.toString().replace("/", "").split(":")[1];

        buf.flip(); // make buffer ready for read
        while (buf.hasRemaining()) {
            buf.get(new byte[buf.limit()]);// read 1 byte at a time
            content += new String(buf.array());
        }
        buf.clear(); // make buffer ready for writing
        System.out.println("接收:" + content.trim());
        // 第一次發;udp采用數據報模式,發送多少次,接收多少次
        ByteBuffer buf2 = ByteBuffer.allocate(65507);
        buf2.clear();
        buf2.put(
                "消息推送內容 abc..UDP是一個非連接的協議,傳輸數據之前源端和終端不建立連接,當它想傳送時就簡單地去抓取來自應用程序的數據,並盡可能快地把它扔到網絡上。在發送端UDP是一個非連接的協議,傳輸數據之前源端和終端不建立連接,當它想傳送時就簡單地去抓取來自應用程序的數據,並盡可能快地把它扔到網絡上。在發送端UDP是一個非連接的協議,傳輸數據之前源端和終端不建立連接,當它想傳送時就簡單地去抓取來自應用程序的數據,並盡可能快地把它扔到網絡上。在發送端@Q"
                        .getBytes());
        buf2.flip();
        channel.send(buf2, new InetSocketAddress(clientAddress, Integer.parseInt(clientPost))); // 將消息回送給客戶端

        // 第二次發
        ByteBuffer buf3 = ByteBuffer.allocate(65507);
        buf3.clear();
        buf3.put("任務完成".getBytes());
        buf3.flip();
        channel.send(buf3, new InetSocketAddress(clientAddress, Integer.parseInt(clientPost))); // 將消息回送給客戶端
    }

    int y = 0;

    public void send(SelectionKey key) {
        if (key == null)
            return;
        // ByteBuffer buff = (ByteBuffer) key.attachment();
        DatagramChannel sc = (DatagramChannel) key.channel();
        try {
            sc.write(ByteBuffer.wrap(new String("aaaa").getBytes()));
        } catch (IOException e1) {
            e1.printStackTrace();
        }
        System.out.println("send2() " + (++y));
    }

    /* 發送文件 */
    public void sendFile(SelectionKey key) {
        if (key == null)
            return;
        ByteBuffer buff = (ByteBuffer) key.attachment();
        SocketChannel sc = (SocketChannel) key.channel();
        String data = decode(buff);
        if (data.indexOf("get") == -1)
            return;
        String subStr = data.substring(data.indexOf(" "), data.length());
        System.out.println("截取之后的字符串是 " + subStr);
        FileInputStream fileInput = null;
        try {
            fileInput = new FileInputStream(subStr);
            FileChannel fileChannel = fileInput.getChannel();
            fileChannel.transferTo(0, fileChannel.size(), sc);
            fileChannel.close();
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                fileInput.close();
            } catch (IOException ex) {
                ex.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws IOException {
        new DatagramChannelServerDemo().service();
    }
}

//客戶端
package com.dxz.springsession.nio.demo3;

import java.nio.channels.*;
import java.nio.charset.*;
import java.net.*;
import java.io.*;
import java.util.*;
import java.nio.*;

public class DatagramChannelClientDemo {
    // UDP協議客戶端
    private String serverIp = "127.0.0.1";
    private int port = 9975;
    // private ServerSocketChannel serverSocketChannel;
    DatagramChannel channel;
    private Charset charset = Charset.forName("UTF-8");
    private Selector selector = null;

    public DatagramChannelClientDemo() throws IOException {
        try {
            selector = Selector.open();
            channel = DatagramChannel.open();
        } catch (Exception e) {
            selector = null;
            channel = null;
            System.out.println("超時");
        }
        System.out.println("客戶器啟動");
    }

    /* 編碼過程 */
    public ByteBuffer encode(String str) {
        return charset.encode(str);
    }

    /* 解碼過程 */
    public String decode(ByteBuffer bb) {
        return charset.decode(bb).toString();
    }

    /* 服務器服務方法 */
    public void service() throws IOException {
        if (channel == null || selector == null)
            return;
        channel.configureBlocking(false);
        channel.connect(new InetSocketAddress(serverIp, port));// 連接服務端
        channel.write(ByteBuffer.wrap(new String("客戶端請求獲取消息").getBytes()));
        channel.register(selector, SelectionKey.OP_READ);
        /** 外循環,已經發生了SelectionKey數目 */
        while (selector.select() > 0) {
            /* 得到已經被捕獲了的SelectionKey的集合 */
            Iterator iterator = selector.selectedKeys().iterator();
            while (iterator.hasNext()) {
                SelectionKey key = null;
                try {
                    key = (SelectionKey) iterator.next();
                    iterator.remove();
                    if (key.isReadable()) {
                        reveice(key);
                    }
                    if (key.isWritable()) {
                        // send(key);
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                    try {
                        if (key != null) {
                            key.cancel();
                            key.channel().close();
                        }
                    } catch (ClosedChannelException cex) {
                        e.printStackTrace();
                    }
                }
            }
            /* 內循環完 */
        }
        /* 外循環完 */
    }

    // /*
    // * 接收 用read()讀IO
    // * */
    // synchronized public void reveice2(SelectionKey key) throws IOException {
    // if (key == null)
    // return;
    // // ***用channel.read()獲取消息***//
    // // :接收時需要考慮字節長度
    // DatagramChannel sc = (DatagramChannel) key.channel();
    // String content = "";
    // // create buffer with capacity of 48 bytes
    // ByteBuffer buf = ByteBuffer.allocate(3);// java里一個(utf-8)中文3字節,gbk中文占2個字節
    // int bytesRead = sc.read(buf); //read into buffer.
    //
    // while (bytesRead >0) {
    // buf.flip(); //make buffer ready for read
    // while(buf.hasRemaining()){
    // buf.get(new byte[buf.limit()]); // read 1 byte at a time
    // content += new String(buf.array());
    // }
    // buf.clear(); //make buffer ready for writing
    // bytesRead = sc.read(buf);
    // }
    // System.out.println("接收:" + content);
    // }

    /* 接收 */
    synchronized public void reveice(SelectionKey key) throws IOException {
        String threadName = Thread.currentThread().getName();
        if (key == null)
            return;
        try {
            // ***用channel.receive()獲取消息***//
            // :接收時需要考慮字節長度
            DatagramChannel sc = (DatagramChannel) key.channel();
            String content = "";
            // 第一次接;udp采用數據報模式,發送多少次,接收多少次
            ByteBuffer buf = ByteBuffer.allocate(65507);// java里一個(utf-8)中文3字節,gbk中文占2個字節
            buf.clear();
            SocketAddress address = sc.receive(buf); // read into buffer.
            String clientAddress = address.toString().replace("/", "").split(":")[0];
            String clientPost = address.toString().replace("/", "").split(":")[1];
            System.out.println(threadName + "\t" + address.toString());
            buf.flip(); // make buffer ready for read
            while (buf.hasRemaining()) {
                buf.get(new byte[buf.limit()]);// read 1 byte at a time
                byte[] tmp = buf.array();
                content += new String(tmp);
            }
            buf.clear(); // make buffer ready for writing次
            System.out.println(threadName + "接收:" + content.trim());
            // 第二次接
            content = "";
            ByteBuffer buf2 = ByteBuffer.allocate(65507);// java里一個(utf-8)中文3字節,gbk中文占2個字節
            buf2.clear();
            SocketAddress address2 = sc.receive(buf2); // read into buffer.
            buf2.flip(); // make buffer ready for read
            while (buf2.hasRemaining()) {
                buf2.get(new byte[buf2.limit()]);// read 1 byte at a time
                byte[] tmp = buf2.array();
                content += new String(tmp);
            }
            buf2.clear(); // make buffer ready for writing次
            System.out.println(threadName + "接收2:" + content.trim());

        } catch (PortUnreachableException ex) {
            System.out.println(threadName + "服務端端口未找到!");
        }
        send(2);
    }

    boolean flag = false;

    public void send(int i) {
        if (flag)
            return;
        try {
            // channel.write(ByteBuffer.wrap(new
            // String("客戶端請求獲取消息(第"+i+"次)").getBytes()));
            // channel.register(selector, SelectionKey.OP_READ );
            ByteBuffer buf2 = ByteBuffer.allocate(48);
            buf2.clear();
            buf2.put(("客戶端請求獲取消息(第" + i + "次)").getBytes());
            buf2.flip();
            channel.write(buf2);
            channel.register(selector, SelectionKey.OP_READ);
            // int bytesSent = channel.send(buf2, new
            // InetSocketAddress(serverIp,port)); // 將消息回送給服務端
        } catch (IOException e) {
            e.printStackTrace();
        }
        flag = true;
    }

    int y = 0;

    public void send(SelectionKey key) {
        if (key == null)
            return;
        // ByteBuffer buff = (ByteBuffer) key.attachment();
        DatagramChannel sc = (DatagramChannel) key.channel();
        try {
            sc.write(ByteBuffer.wrap(new String("aaaa").getBytes()));
        } catch (IOException e1) {
            e1.printStackTrace();
        }
        System.out.println("send2() " + (++y));
    }

    /* 發送文件 */
    public void sendFile(SelectionKey key) {
        if (key == null)
            return;
        ByteBuffer buff = (ByteBuffer) key.attachment();
        SocketChannel sc = (SocketChannel) key.channel();
        String data = decode(buff);
        if (data.indexOf("get") == -1)
            return;
        String subStr = data.substring(data.indexOf(" "), data.length());
        System.out.println("截取之后的字符串是 " + subStr);
        FileInputStream fileInput = null;
        try {
            fileInput = new FileInputStream(subStr);
            FileChannel fileChannel = fileInput.getChannel();
            fileChannel.transferTo(0, fileChannel.size(), sc);
            fileChannel.close();
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                fileInput.close();
            } catch (IOException ex) {
                ex.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws IOException {
        new Thread(new Runnable() {
            public void run() {
                try {
                    new DatagramChannelClientDemo().service();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }).start();

        // new Thread(new Runnable() {
        // public void run() {
        // try {
        // new DatagramChannelClientDemo().service();
        // } catch (IOException e) {
        // e.printStackTrace();
        // }
        // }
        // }).start();

    }
}

 

 

Java NIO中的Buffer用於和NIO通道進行交互。如你所知,數據是從通道讀入緩沖區,從緩沖區寫入到通道中的。交互圖如下:

緩沖區本質上是一塊可以寫入數據,然后可以從中讀取數據的內存。這塊內存被包裝成NIO Buffer對象,並提供了一組方法,用來方便的訪問該塊內存。緩沖區實際上是一個容器對象,更直接的說,其實就是一個數組,在NIO庫中,所有數據都是用緩沖區處理的。在讀取數據時,它是直接讀到緩沖區中的; 在寫入數據時,它也是寫入到緩沖區中的;任何時候訪問 NIO 中的數據,都是將它放到緩沖區中。而在面向流I/O系統中,所有數據都是直接寫入或者直接將數據讀取到Stream對象中。

在NIO中,所有的緩沖區類型都繼承於抽象類Buffer,最常用的就是ByteBuffer,對於Java中的基本類型,基本都有一個具體Buffer類型與之相對應,它們之間的繼承關系如下圖所示:

 

下面是NIO Buffer相關的話題列表:

  1. Buffer的基本用法
  2. Buffer的capacity,position和limit
  3. Buffer的類型
  4. Buffer的分配
  5. 向Buffer中寫數據
  6. flip()方法
  7. 從Buffer中讀取數據
  8. clear()與compact()方法
  9. mark()與reset()方法
  10. equals()與compareTo()方法

 

Buffer的基本用法

使用Buffer讀寫數據一般遵循以下四個步驟:

  1. 寫入數據到Buffer
  2. 調用flip()方法
  3. 從Buffer中讀取數據
  4. 調用clear()方法或者compact()方法

當向buffer寫入數據時,buffer會記錄下寫了多少數據。一旦要讀取數據,需要通過flip()方法將Buffer從寫模式切換到讀模式。在讀模式下,可以讀取之前寫入到buffer的所有數據。

一旦讀完了所有的數據,就需要清空緩沖區,讓它可以再次被寫入。有兩種方式能清空緩沖區:調用clear()或compact()方法。clear()方法會清空整個緩沖區。compact()方法只會清除已經讀過的數據。任何未讀的數據都被移到緩沖區的起始處,新寫入的數據將放到緩沖區未讀數據的后面。

下面是一個使用Buffer的例子:

01 RandomAccessFile aFile = new RandomAccessFile("data/nio-data.txt""rw");
02 FileChannel inChannel = aFile.getChannel();
03  
04 //create buffer with capacity of 48 bytes
05 ByteBuffer buf = ByteBuffer.allocate(48);
06  
07 int bytesRead = inChannel.read(buf); //read into buffer.
08 while (bytesRead != -1) {
09  
10   buf.flip();  //make buffer ready for read
11  
12   while(buf.hasRemaining()){
13       System.out.print((char) buf.get()); // read 1 byte at a time
14   }
15  
16   buf.clear(); //make buffer ready for writing
17   bytesRead = inChannel.read(buf);
18 }
19 aFile.close();

示例2:

下面是一個簡單的使用IntBuffer的例子:

package com.dxz.nio;

import java.nio.IntBuffer;

public class TestIntBuffer {  
    public static void main(String[] args) {  
        // 分配新的int緩沖區,參數為緩沖區容量  
        // 新緩沖區的當前位置將為零,其界限(限制位置)將為其容量。它將具有一個底層實現數組,其數組偏移量將為零。  
        IntBuffer buffer = IntBuffer.allocate(8);  
  
        for (int i = 0; i < buffer.capacity(); ++i) {  
            int j = 2 * (i + 1); 
            // 將給定整數寫入此緩沖區的當前位置,當前位置遞增  
            buffer.put(j);  
        }  
  
        // 重設此緩沖區,將限制設置為當前位置,然后將當前位置設置為0  
        buffer.flip();  
  
        // 查看在當前位置和限制位置之間是否有元素  
        while (buffer.hasRemaining()) {  
            // 讀取此緩沖區當前位置的整數,然后當前位置遞增  
            int j = buffer.get();  
            System.out.print(j + "  ");  
        }  
  
    }  
  
}  

結果:

2  4  6  8  10  12  14  16  

 

Buffer的capacity,position和limit

緩沖區本質上是一塊可以寫入數據,然后可以從中讀取數據的內存。這塊內存被包裝成NIO Buffer對象,並提供了一組方法,用來方便的訪問該塊內存。

為了理解Buffer的工作原理,需要熟悉它的三個屬性:

  • capacity
  • position
  • limit

position和limit的含義取決於Buffer處在讀模式還是寫模式。不管Buffer處在什么模式,capacity的含義總是一樣的。

這里有一個關於capacity,position和limit在讀寫模式中的說明,詳細的解釋在插圖后面。

capacity

作為一個內存塊,Buffer有一個固定的大小值,也叫“capacity”.你只能往里寫capacity個byte、long,char等類型。一旦Buffer滿了,需要將其清空(通過讀數據或者清除數據)才能繼續寫數據往里寫數據。

position

當你寫數據到Buffer中時,position表示當前的位置。初始的position值為0.當一個byte、long等數據寫到Buffer后, position會向前移動到下一個可插入數據的Buffer單元。position最大可為capacity – 1.

當讀取數據時,也是從某個特定位置讀。當將Buffer從寫模式切換到讀模式,position會被重置為0. 當從Buffer的position處讀取數據時,position向前移動到下一個可讀的位置。

limit

在寫模式下,Buffer的limit表示你最多能往Buffer里寫多少數據。 寫模式下,limit等於Buffer的capacity。

當切換Buffer到讀模式時, limit表示你最多能讀到多少數據。因此,當切換Buffer到讀模式時,limit會被設置成寫模式下的position值。換句話說,你能讀到之前寫入的所有數據(limit被設置成已寫數據的數量,這個值在寫模式下就是position)

Buffer的類型

Java NIO 有以下Buffer類型

  • ByteBuffer
  • MappedByteBuffer
  • CharBuffer
  • DoubleBuffer
  • FloatBuffer
  • IntBuffer
  • LongBuffer
  • ShortBuffer

p<>
如你所見,這些Buffer類型代表了不同的數據類型。換句話說,就是可以通過char,short,int,long,float 或 double類型來操作緩沖區中的字節。

MappedByteBuffer 有些特別,在涉及它的專門章節中再講。

Buffer的分配

要想獲得一個Buffer對象首先要進行分配。 每一個Buffer類都有一個allocate方法。下面是一個分配48字節capacity的ByteBuffer的例子。

1 ByteBuffer buf = ByteBuffer.allocate(48);

這是分配一個可存儲1024個字符的CharBuffer:

1 CharBuffer buf = CharBuffer.allocate(1024);

向Buffer中寫數據

寫數據到Buffer有兩種方式:

  • 從Channel寫到Buffer。
  • 通過Buffer的put()方法寫到Buffer里。

從Channel寫到Buffer的例子

1 int bytesRead = inChannel.read(buf); //read into buffer.

通過put方法寫Buffer的例子:

1 buf.put(127);

put方法有很多版本,允許你以不同的方式把數據寫入到Buffer中。例如, 寫到一個指定的位置,或者把一個字節數組寫入到Buffer。 更多Buffer實現的細節參考JavaDoc。

flip()方法

flip英 [flɪp] 美 [flɪp] 及物動詞 輕彈,輕擊; 按(開關); 快速翻轉; 急揮

flip方法將Buffer從寫模式切換到讀模式。調用flip()方法會將position設回0,並將limit設置成之前position的值。

換句話說,position現在用於標記讀的位置,limit表示之前寫進了多少個byte、char等 —— 現在能讀取多少個byte、char等。

從Buffer中讀取數據

從Buffer中讀取數據有兩種方式:

  1. 從Buffer讀取數據到Channel。
  2. 使用get()方法從Buffer中讀取數據。

從Buffer讀取數據到Channel的例子:

//read from buffer into channel.
int bytesWritten = inChannel.write(buf);
 

使用get()方法從Buffer中讀取數據的例子

byte aByte = buf.get();

get方法有很多版本,允許你以不同的方式從Buffer中讀取數據。例如,從指定position讀取,或者從Buffer中讀取數據到字節數組。更多Buffer實現的細節參考JavaDoc。

rewind()方法

Buffer.rewind()將position設回0,所以你可以重讀Buffer中的所有數據。limit保持不變,仍然表示能從Buffer中讀取多少個元素(byte、char等)。

clear()與compact()方法

一旦讀完Buffer中的數據,需要讓Buffer准備好再次被寫入。可以通過clear()或compact()方法來完成。

如果調用的是clear()方法,position將被設回0,limit被設置成 capacity的值。換句話說,Buffer 被清空了。Buffer中的數據並未清除,只是這些標記告訴我們可以從哪里開始往Buffer里寫數據。

如果Buffer中有一些未讀的數據,調用clear()方法,數據將“被遺忘”,意味着不再有任何標記會告訴你哪些數據被讀過,哪些還沒有。

如果Buffer中仍有未讀的數據,且后續還需要這些數據,但是此時想要先先寫些數據,那么使用compact()方法。

compact()方法將所有未讀的數據拷貝到Buffer起始處。然后將position設到最后一個未讀元素正后面。limit屬性依然像clear()方法一樣,設置成capacity。現在Buffer准備好寫數據了,但是不會覆蓋未讀的數據。

mark()與reset()方法

通過調用Buffer.mark()方法,可以標記Buffer中的一個特定position。之后可以通過調用Buffer.reset()方法恢復到這個position。例如:

1 buffer.mark();
2  
3 //call buffer.get() a couple of times, e.g. during parsing.
4  
5 buffer.reset();  //set position back to mark.

equals()與compareTo()方法

可以使用equals()和compareTo()方法兩個Buffer。

equals()

當滿足下列條件時,表示兩個Buffer相等:

  1. 有相同的類型(byte、char、int等)。
  2. Buffer中剩余的byte、char等的個數相等。
  3. Buffer中所有剩余的byte、char等都相同。

如你所見,equals只是比較Buffer的一部分,不是每一個在它里面的元素都比較。實際上,它只比較Buffer中的剩余元素。

compareTo()方法

compareTo()方法比較兩個Buffer的剩余元素(byte、char等), 如果滿足下列條件,則認為一個Buffer“小於”另一個Buffer:

    1. 第一個不相等的元素小於另一個Buffer中對應的元素 。
    2. 所有元素都相等,但第一個Buffer比另一個先耗盡(第一個Buffer的元素個數比另一個少)。

 

緩沖區分片

  在NIO中,除了可以分配或者包裝一個緩沖區對象外,還可以根據現有的緩沖區對象來創建一個子緩沖區,即在現有緩沖區上切出一片來作為一個新的緩沖區,但現有的緩沖區與創建的子緩沖區在底層數組層面上是數據共享的,也就是說,子緩沖區相當於是現有緩沖區的一個視圖窗口。調用slice()方法可以創建一個子緩沖區,讓我們通過例子來看一下:

package com.dxz.nio;

import java.nio.ByteBuffer;

public class BufferDemo1 {
    static public void main(String args[]) throws Exception {
        ByteBuffer buffer = ByteBuffer.allocate(10);

        // 緩沖區中的數據0-9
        for (int i = 0; i < buffer.capacity(); ++i) {
            buffer.put((byte) i);
        }

        // 創建子緩沖區
        buffer.position(3);
        buffer.limit(7);
        ByteBuffer slice = buffer.slice();

        // 改變子緩沖區的內容
        for (int i = 0; i < slice.capacity(); ++i) {
            byte b = slice.get(i);
            b *= 10;
            slice.put(i, b);
        }

        buffer.position(0);
        buffer.limit(buffer.capacity());

        while (buffer.remaining() > 0) {
            System.out.println(buffer.get());
        }
    }
}

結果:

0
1
2
30
40
50
60
7
8
9

只讀緩沖區

只讀緩沖區非常簡單,可以讀取它們,但是不能向它們寫入數據。可以通過調用緩沖區的asReadOnlyBuffer()方法,將任何常規緩沖區轉 換為只讀緩沖區,這個方法返回一個與原緩沖區完全相同的緩沖區,並與原緩沖區共享數據,只不過它是只讀的。如果原緩沖區的內容發生了變化,只讀緩沖區的內容也隨之發生變化:

package com.dxz.nio;

import java.nio.ByteBuffer;

public class BufferDemo2 {
    static public void main(String args[]) throws Exception {
        ByteBuffer buffer = ByteBuffer.allocate(10);

        // 緩沖區中的數據0-9
        for (int i = 0; i < buffer.capacity(); ++i) {
            buffer.put((byte) i);
        }

        // 創建只讀緩沖區
        ByteBuffer readonly = buffer.asReadOnlyBuffer();

        // 改變原緩沖區的內容
        for (int i = 0; i < buffer.capacity(); ++i) {
            byte b = buffer.get(i);
            b *= 10;
            buffer.put(i, b);
        }

        readonly.position(0);
        readonly.limit(buffer.capacity());

        // 只讀緩沖區的內容也隨之改變
        while (readonly.remaining() > 0) {
            System.out.println(readonly.get());
        }
    }
}

結果:

0
10
20
30
40
50
60
70
80
90

  如果嘗試修改只讀緩沖區的內容,則會報ReadOnlyBufferException異常。只讀緩沖區對於保護數據很有用。在將緩沖區傳遞給某個 對象的方法時,無法知道這個方法是否會修改緩沖區中的數據。創建一個只讀的緩沖區可以保證該緩沖區不會被修改。只可以把常規緩沖區轉換為只讀緩沖區,而不能將只讀的緩沖區轉換為可寫的緩沖區。

直接緩沖區

  直接緩沖區是為加快I/O速度,使用一種特殊方式為其分配內存的緩沖區,JDK文檔中的描述為:給定一個直接字節緩沖區,Java虛擬機將盡最大努 力直接對它執行本機I/O操作。也就是說,它會在每一次調用底層操作系統的本機I/O操作之前(或之后),嘗試避免將緩沖區的內容拷貝到一個中間緩沖區中 或者從一個中間緩沖區中拷貝數據。要分配直接緩沖區,需要調用allocateDirect()方法,而不是allocate()方法,使用方式與普通緩沖區並無區別,如下面的拷貝文件示例:

package com.dxz.nio;

import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.*;

public class BufferDemo3 {
    static public void main(String args[]) throws Exception {
        String infile = "e:\\logs\\test.txt";
        FileInputStream fin = new FileInputStream(infile);
        FileChannel fcin = fin.getChannel();

        String outfile = String.format("e:\\logs\\testcopy.txt");
        FileOutputStream fout = new FileOutputStream(outfile);
        FileChannel fcout = fout.getChannel();

        // 使用allocateDirect,而不是allocate
        ByteBuffer buffer = ByteBuffer.allocateDirect(1024);

        while (true) {
            buffer.clear();
            int r = fcin.read(buffer);
            if (r == -1) {
                break;
            }
            buffer.flip();
            fcout.write(buffer);
        }
    }
}

內存映射文件I/O

  內存映射文件I/O是一種讀和寫文件數據的方法,它可以比常規的基於流或者基於通道的I/O快的多。內存映射文件I/O是通過使文件中的數據出現為 內存數組的內容來完成的,這其初聽起來似乎不過就是將整個文件讀到內存中,但是事實上並不是這樣。一般來說,只有文件中實際讀取或者寫入的部分才會映射到內存中。如下面的示例代碼:

package com.dxz.nio;

import java.io.RandomAccessFile;
import java.nio.MappedByteBuffer;
import java.nio.channels.*;

public class BufferDemo4 {
    static private final int start = 0;
    static private final int size = 1024;

    static public void main(String args[]) throws Exception {
        RandomAccessFile raf = new RandomAccessFile("e:\\logs\\test.txt", "rw");
        FileChannel fc = raf.getChannel();

        MappedByteBuffer mbb = fc.map(FileChannel.MapMode.READ_WRITE, start, size);

        mbb.put(0, (byte) 97);
        mbb.put(1023, (byte) 122);

        raf.close();
    }
}

 參考:http://blog.csdn.net/wuxianglong/article/details/6612263


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM