zk客戶端的ClientCnxn類


ClientCnxn是客戶端的類:該類管理zk客戶端的socket io,維持一個可用服務器的列表。

//ClientCnxn類
private final LinkedList<Packet> outgoingQueue = new LinkedList<Packet>(); //待發送
private final LinkedList<Packet> pendingQueue = new LinkedList<Packet>(); //發送后等待響應
final SendThread sendThread;
final EventThread eventThread;

Packet封裝了請求、響應以及回調等。

static class Packet {
    //省略其他代碼
    RequestHeader requestHeader;
    ReplyHeader replyHeader;
    Record request;
    Record response;
    ByteBuffer bb;
    String clientPath;
    String serverPath;
    boolean finished;
    AsyncCallback cb;
    Object ctx;
    WatchRegistration watchRegistration;
    public boolean readOnly;
    Packet(RequestHeader requestHeader, ReplyHeader replyHeader,
           Record request, Record response,
           WatchRegistration watchRegistration) {
        this(requestHeader, replyHeader, request, response,
             watchRegistration, false);
    }
    Packet(RequestHeader requestHeader, ReplyHeader replyHeader,
           Record request, Record response,
           WatchRegistration watchRegistration, boolean readOnly) {
        this.requestHeader = requestHeader;
        this.replyHeader = replyHeader;
        this.request = request;
        this.response = response;
        this.readOnly = readOnly;
        this.watchRegistration = watchRegistration;
    }
}

ClientCnxn類中有SendThread和EventThread兩個線程,SendThread負責io(發送和接收),EventThread負責事件處理。

Packet在outgoingQueue和pendingQueue之間流轉:

首先,調用線程把Packet放在outgoingQueue中,SendThread從outgoingQueue中取出Packet並發送,然后把該Packet放入pendingQueue中,等待響應,當響應來到時,解析響應,並從pendingQueue中刪除Packet。

示例代碼:

public class MyZK {
    public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
        ZooKeeper zk = new ZooKeeper("127.0.0.1:2181", 60000000, new MyWatcher());
        
        List<ACL> list = new ArrayList<ACL>(1);
        list.add(new ACL(Perms.ALL, Ids.ANYONE_ID_UNSAFE));
        //CreateResponse
        String create = zk.create("/zhang/hello", "hello".getBytes(), list, CreateMode.PERSISTENT);
        System.out.println(create);
        
        //GetDataResponse        
// 添加了watch。刪除該節點的時候,服務端會發通知
byte[] data = zk.getData("/zhang/hello", true, null); System.out.println(new String(data)); Stat stat = zk.exists("/zhang/hello", false); System.out.println(stat); zk.delete("/zhang/hello", -1); } static class MyWatcher implements Watcher { public void process(WatchedEvent event) { System.out.println(event); } } }

getData 調用棧:

// ZooKeeper.getData
public byte[] getData(final String path, Watcher watcher, Stat stat)
    throws KeeperException, InterruptedException
 {
    final String clientPath = path;
    PathUtils.validatePath(clientPath);

    // the watch contains the un-chroot path
    WatchRegistration wcb = null;
    if (watcher != null) {
        wcb = new DataWatchRegistration(watcher, clientPath);
    }

    final String serverPath = prependChroot(clientPath);

    RequestHeader h = new RequestHeader();    //請求頭
    h.setType(ZooDefs.OpCode.getData);
    GetDataRequest request = new GetDataRequest(); //請求
    request.setPath(serverPath);
    request.setWatch(watcher != null);
    GetDataResponse response = new GetDataResponse(); //響應
    ReplyHeader r = cnxn.submitRequest(h, request, response, wcb); //發送請求,阻塞,等待響應
    if (r.getErr() != 0) {
        throw KeeperException.create(KeeperException.Code.get(r.getErr()),
                clientPath);
    }
    if (stat != null) {
        DataTree.copyStat(response.getStat(), stat);
    }
    return response.getData();
}

調用線程把請求放入outgoingQueue隊列中,然后阻塞:

public ReplyHeader submitRequest(RequestHeader h, Record request,
        Record response, WatchRegistration watchRegistration)
        throws InterruptedException {
    ReplyHeader r = new ReplyHeader();
    //創建Packet,並放入outgoingQueue中
    Packet packet = queuePacket(h, r, request, response, null, null, null,
                null, watchRegistration);
    //先獲取鎖
    synchronized (packet) {
        while (!packet.finished) {
            //等待,並釋放鎖
            packet.wait();
        }
    }
    return r;
}

SendThread負責io操作,發送請求,接收響應。

發送:把outgoingQueue中的packet發送出去,然后把packet放到pendingQueue中,等待響應。

接收:解析響應,從pendingQueue中刪除packet。

// ClientCnxnSocketNIO.doIO
void doIO(List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue, ClientCnxn cnxn)
  throws InterruptedException, IOException {
    SocketChannel sock = (SocketChannel) sockKey.channel();
    if (sock == null) {
        throw new IOException("Socket is null!");
    }
    if (sockKey.isReadable()) {
        //讀取響應
        int rc = sock.read(incomingBuffer);
        if (rc < 0) {
            throw new EndOfStreamException(
                    "Unable to read additional data from server sessionid 0x"
                            + Long.toHexString(sessionId)
                            + ", likely server has closed socket");
        }
        if (!incomingBuffer.hasRemaining()) {
            incomingBuffer.flip();
            if (incomingBuffer == lenBuffer) {
                recvCount++;
                readLength();
            } else if (!initialized) {
                readConnectResult();
                enableRead();
                if (findSendablePacket(outgoingQueue,
                        cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {
                    enableWrite();
                }
                lenBuffer.clear();
                incomingBuffer = lenBuffer;
                updateLastHeard();
                initialized = true;
            } else { 
                //解析響應
                sendThread.readResponse(incomingBuffer);
                lenBuffer.clear();
                incomingBuffer = lenBuffer;
                updateLastHeard();
            }
        }
    }
    if (sockKey.isWritable()) {
        //發送請求
        synchronized(outgoingQueue) {
            Packet p = findSendablePacket(outgoingQueue,
                    cnxn.sendThread.clientTunneledAuthenticationInProgress());

            if (p != null) {
                updateLastSend();
                // If we already started writing p, p.bb will already exist
                if (p.bb == null) {
                    if ((p.requestHeader != null) &&
                            (p.requestHeader.getType() != OpCode.ping) &&
                            (p.requestHeader.getType() != OpCode.auth)) {
                        p.requestHeader.setXid(cnxn.getXid());
                    }
                    p.createBB();
                }
                sock.write(p.bb);
                if (!p.bb.hasRemaining()) {
                    sentCount++;
                    //從outgoingQueue中刪除packet
                    outgoingQueue.removeFirstOccurrence(p);
                    if (p.requestHeader != null
                            && p.requestHeader.getType() != OpCode.ping
                            && p.requestHeader.getType() != OpCode.auth) {
                        synchronized (pendingQueue) {
                            //把packet加入到pendingQueue中
                            pendingQueue.add(p);
                        }
                    }
                }
            }
            if (outgoingQueue.isEmpty()) {
                disableWrite();
            } else if (!initialized && p != null && !p.bb.hasRemaining()) {
                disableWrite();
            } else {
                // Just in case
                enableWrite();
            }
        }
    }
}

解析響應:

//SendThread類
void readResponse(ByteBuffer incomingBuffer) throws IOException {
    //省略其他代碼
    ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
    BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
    ReplyHeader replyHdr = new ReplyHeader();
    replyHdr.deserialize(bbia, "header");

    // -1表示notification    
    if (replyHdr.getXid() == -1) {
        //反序列化,設置WatcherEvent對象
        WatcherEvent event = new WatcherEvent();
        event.deserialize(bbia, "response");
        // convert from a server path to a client path
        if (chrootPath != null) {
            String serverPath = event.getPath();
            if(serverPath.compareTo(chrootPath)==0)
                event.setPath("/");
            else if (serverPath.length() > chrootPath.length())
                event.setPath(serverPath.substring(chrootPath.length()));
            else {
                LOG.warn("Got server path " + event.getPath()
                        + " which is too short for chroot path "
                        + chrootPath);
            }
        }

        WatchedEvent we = new WatchedEvent(event);
        //放入eventThread的隊列
        eventThread.queueEvent(we);
        return;
    }
    //正常的響應數據
    Packet packet;
    synchronized (pendingQueue) {
        if (pendingQueue.size() == 0) {
            throw new IOException("Nothing in the queue, but got " + replyHdr.getXid());
        }
        packet = pendingQueue.remove();
    }
    /*
     * Since requests are processed in order, we better get a response
     * to the first request!
     */
    try {
        if (packet.requestHeader.getXid() != replyHdr.getXid()) {
            packet.replyHeader.setErr(KeeperException.Code.CONNECTIONLOSS.intValue());
            throw new IOException("Xid out of order.");
        }

        packet.replyHeader.setXid(replyHdr.getXid());
        packet.replyHeader.setErr(replyHdr.getErr());
        packet.replyHeader.setZxid(replyHdr.getZxid());
        if (replyHdr.getZxid() > 0) {
            lastZxid = replyHdr.getZxid();
        }
        if (packet.response != null && replyHdr.getErr() == 0) {
            //反序列化設置response內容
            packet.response.deserialize(bbia, "response");
        }
    } finally {
        finishPacket(packet);
    }
}

調用線程現在還阻塞着呢,需要有人喚醒:

private void finishPacket(Packet p) {
    if (p.watchRegistration != null) {
        p.watchRegistration.register(p.replyHeader.getErr());
    }

    if (p.cb == null) {
        synchronized (p) {
            p.finished = true;
            //喚醒調用線程
            p.notifyAll();
        }
    } else {
        p.finished = true;
        eventThread.queuePacket(p);
    }
}

問題:

SendThread是按順序發送outgoingQueue中的Packet,然后放入pendingQueue中,但是接收到的響應未必是按順序的(網絡環境中先發的數據包未必先到達吧),

zk就這么自信收到的響應和pendingQueue中的順序一致?


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM