muduo源碼TcpConnection類與TcpServer類詳解


 

簡介

TcpConnection類,TcpServer類,Acceptor類是構成非阻塞TCP網絡編程庫的重要組成部分。

本文主要針對muduo源碼進行分析。(Acceptor類在上篇中已經分析過了)

muduo網絡庫的單線程設計方式,即一個EventLoop 處理所有的事件,包括鏈接的建立、IO、計算、以及鏈接的銷毀,多線程的方式即每一個連接一個EventLoop。(one loop per thread)

TcpServer

多線程的muduo::TcpServer,主要通過添加一個EventLoopThreadPool 事件循環線程池實現,新建TcpConnection時從event loop pool里挑選一個loop給TcpConnection用。 也就是說多線程TcpServer自己的EventLoop只用來接受新連接, 而新連接會用其他EventLoop來執行IO。 (單線程TcpServer的EventLoop是與TcpConnection共享的。)

EventLoopThreadPooll 按one loop per thread的思想實現多線程TcpServer, 此時主線程循環只負責TCP鏈接的建立,及任務的分配,需要讓哪個線程干活, 就把timer或IO(如TCP連接) 注冊到那個線程的循環里即可;對實時性有要求的connection可以單獨用一個線程; 數據量大的connection可以獨占一個線程;並把數據處理任務分攤到另幾個計算線程中(用線程池);其他次要的輔助性connections共享一個線程。

 

線程池設計模式

池是一種設計模式,線程是是一種資源,線程池設計模式通常是事先申請一定的資源,當需要使用時,去資源池中申請資源,用完后再放回資源池中。EventLoopThreadPool 正是借鑒了這種設計模式,雖然使用的線程並不會放回資源池中,但是本身的一個EventLoop即一個Reactor,本身就具備等待機制了。

 

TcpServer每次新建一條TcpConnection就會通過EventLoopThreadPool::getNextLoop()方法來取一個EventLoop, 目前的getNextLoop()只是循環的從池中取一條loop,如果提供給每條TcpConncetion的是均等的服務,那么這樣就能很均勻的分配系統的資源了。

 

TcpServer的工作方式取決於EventLoopThreadPool中線程的創建數量。

0 意味着所有的I/O 都在TcpServer的主事件循環中,不會創建新的線程。

1 意味着所有的 I/O 在另一個線程中 ,TcpServer的主線程只負責建立連接。

N 意味着新的連接會被循環的分配到N條線程中工作。

下面是源碼(帶有注釋)

TcpServer.h

// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.

// Author: Shuo Chen (chenshuo at chenshuo dot com)
//
// This is a public header file, it must only include public header files.
/* 這個類相當於把TcpConnection以及Accept類整合起來,完全能夠實現Tcp通信,也就是socket函數都實現了
 * 總結一下整個TCP通信過程:
 * 一個TcpServer類中,有Acceptor,EventLoopThreadPool各一個,以及多個TcpConnection類的指針,
 * 在TcpServer類的啟動函數中,先開啟EventLoopThreadPool線程池,然后將Acceptor監聽函數放入eventloop中去執行
 * 在TcpServer類的構造函數中,就已經把一個成功連接的回調函數綁定在Acceptor類的連接回調函數中,如果Acceptor監聽
 * 到有連接進來,先調監聽socket描述符的回調函數,把這個連接accept進來,然后再調用newConnectionCallback_函數
 * 來處理連接,每個連接都有一個對應的TcpConnection類來作為緩沖區
 * */
#ifndef MUDUO_NET_TCPSERVER_H
#define MUDUO_NET_TCPSERVER_H

#include <muduo/base/Types.h>
#include <muduo/net/TcpConnection.h>

#include <map>
#include <boost/noncopyable.hpp>
#include <boost/scoped_ptr.hpp>

namespace muduo {
    namespace net {

        class Acceptor;

        class EventLoop;

        class EventLoopThreadPool;

///
/// TCP server, supports single-threaded and thread-pool models.
///
/// This is an interface class, so don't expose too much details.
        class TcpServer : boost::noncopyable {
        public:
            typedef boost::function<void(EventLoop *)> ThreadInitCallback;

            //TcpServer(EventLoop* loop, const InetAddress& listenAddr);
            TcpServer(EventLoop *loop,
                      const InetAddress &listenAddr,
                      const string &nameArg);

            ~TcpServer();  // force out-line dtor, for scoped_ptr members.

            const string &hostport() const { return hostport_; }

            const string &name() const { return name_; }

            /// Set the number of threads for handling input.
            ///
            /// Always accepts new connection in loop's thread.
            /// Must be called before @c start
            /// @param numThreads
            /// - 0 means all I/O in loop's thread, no thread will created.
            ///   this is the default value.
            /// - 1 means all I/O in another thread.
            /// - N means a thread pool with N threads, new connections
            ///   are assigned on a round-robin basis.
            void setThreadNum(int numThreads);

            void setThreadInitCallback(
                    const ThreadInitCallback &cb) { threadInitCallback_ = cb; }//這個函數會作為EventLoopThreadPool::start的入口參數

            /// Starts the server if it's not listenning.
            ///
            /// It's harmless to call it multiple times.
            /// Thread safe.
            void start();

            /// Set connection callback.
            /// Not thread safe.
            // 設置連接到來或者連接關閉回調函數,這個函數指針會賦值給TcpConnection::connectionCallback_函數,就是在連接建立之后,和連接斷開之前會調用
            void setConnectionCallback(const ConnectionCallback &cb) { connectionCallback_ = cb; }

            /// Set message callback.
            /// Not thread safe.
            //  設置消息到來回調函數,這個函數指針在TcpConnection::handleread函數中調用,也就是TcpConnection的Channel的讀函數的一部分
            void setMessageCallback(const MessageCallback &cb) { messageCallback_ = cb; }

            /// Set write complete callback.
            /// Not thread safe.
            /// 在發送完消息以后調用,這個函數指針會賦值給TcpConnection::writeCompleteCallback_函數
            void setWriteCompleteCallback(const WriteCompleteCallback &cb) { writeCompleteCallback_ = cb; }


        private:
            /// Not thread safe, but in loop
            void newConnection(int sockfd, const InetAddress &peerAddr);
            //這個函數會賦值給Acceptor::newConnectionCallback_,在新連接建立以后調用
            /// Thread safe.
            /// 會賦值給TcpConnection::closeCallback_函數,也就是當連接描述符關閉以后調用這個
            void removeConnection(const TcpConnectionPtr &conn);

            /// Not thread safe, but in loop,在上面這個函數removeConnection中調用
            void removeConnectionInLoop(const TcpConnectionPtr &conn);

            typedef std::map <string, TcpConnectionPtr> ConnectionMap;

            EventLoop *loop_;  // the acceptor loop
            const string hostport_;        // 服務的ip:端口
            const string name_;            // 服務名
            boost::scoped_ptr <Acceptor> acceptor_; // avoid revealing Acceptor
            boost::scoped_ptr <EventLoopThreadPool> threadPool_;
            ConnectionCallback connectionCallback_;
            MessageCallback messageCallback_;
            WriteCompleteCallback writeCompleteCallback_;        // 數據發送完畢,會回調此函數
            ThreadInitCallback threadInitCallback_;    // IO線程池中的線程在進入事件循環前,會回調用此函數
            bool started_;
            // always in loop thread
            int nextConnId_;                // 下一個連接ID,每次增加一個就加1
            ConnectionMap connections_;    // 連接列表
        };

    }
}

#endif  // MUDUO_NET_TCPSERVER_H

TcpServer.cc

// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.

// Author: Shuo Chen (chenshuo at chenshuo dot com)

#include <muduo/net/TcpServer.h>

#include <muduo/base/Logging.h>
#include <muduo/net/Acceptor.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/EventLoopThreadPool.h>
#include <muduo/net/SocketsOps.h>

#include <boost/bind.hpp>

#include <stdio.h>  // snprintf

using namespace muduo;
using namespace muduo::net;

TcpServer::TcpServer(EventLoop *loop, const InetAddress &listenAddr,
                     const string &nameArg)
        : loop_(CHECK_NOTNULL(loop)),
          hostport_(listenAddr.toIpPort()),
          name_(nameArg),
          acceptor_(new Acceptor(loop, listenAddr)),
          threadPool_(new EventLoopThreadPool(loop)),
          connectionCallback_(defaultConnectionCallback),
          messageCallback_(defaultMessageCallback),
          started_(false),
          nextConnId_(1) {
    // Acceptor::handleRead函數中會回調用TcpServer::newConnection
    // _1對應的是socket文件描述符,_2對應的是對等方的地址(InetAddress)
    acceptor_->setNewConnectionCallback(
            boost::bind(&TcpServer::newConnection, this, _1, _2));
}

TcpServer::~TcpServer() {
    loop_->assertInLoopThread();
    LOG_TRACE << "TcpServer::~TcpServer [" << name_ << "] destructing";

    for (ConnectionMap::iterator it(connections_.begin());
         it != connections_.end(); ++it) {
        TcpConnectionPtr conn = it->second;
        it->second.reset();        // 釋放當前所控制的對象,引用計數減一
        conn->getLoop()->runInLoop(
                boost::bind(&TcpConnection::connectDestroyed, conn));
        conn.reset();            // 釋放當前所控制的對象,引用計數減一
    }
}

void TcpServer::setThreadNum(int numThreads) {
    assert(0 <= numThreads);
    threadPool_->setThreadNum(numThreads);
}

// 該函數多次調用是無害的
// 該函數可以跨線程調用
void TcpServer::start() {
    if (!started_) {
        started_ = true;
        threadPool_->start(threadInitCallback_);
    }

    if (!acceptor_->listenning()) {
        // get_pointer返回原生指針
        loop_->runInLoop(
                boost::bind(&Acceptor::listen, get_pointer(acceptor_)));
    }
}

void TcpServer::newConnection(int sockfd, const InetAddress &peerAddr)//建立新連接以后的回調函數
{
    loop_->assertInLoopThread();
    // 按照輪叫的方式選擇一個EventLoop
    EventLoop *ioLoop = threadPool_->getNextLoop();
    char buf[32];
    snprintf(buf, sizeof buf, ":%s#%d", hostport_.c_str(), nextConnId_);//buf的內容是 ip:端口#nextConnId_
    ++nextConnId_;
    string connName = name_ + buf;

    LOG_INFO << "TcpServer::newConnection [" << name_
             << "] - new connection [" << connName
             << "] from " << peerAddr.toIpPort();
    InetAddress localAddr(sockets::getLocalAddr(sockfd));
    // FIXME poll with zero timeout to double confirm the new connection
    // FIXME use make_shared if necessary
    /*TcpConnectionPtr conn(new TcpConnection(loop_,
                                            connName,
                                            sockfd,
                                            localAddr,
                                            peerAddr));*/

    TcpConnectionPtr conn(new TcpConnection(ioLoop,
                                            connName,
                                            sockfd,
                                            localAddr,
                                            peerAddr));

    LOG_TRACE << "[1] usecount=" << conn.use_count();
    connections_[connName] = conn;//將連接名和TCPConnection的指針拷貝進連接列表中,這樣就有兩個shared_ptr指針指向conn了,
    //如果沒有這一句程序,這個conn在newConnection函數執行結束以后就會析構掉,所以真正要刪除時,也要把這個列表中的對應元素也刪除了。
    LOG_TRACE << "[2] usecount=" << conn.use_count();
    //設置回調函數
    conn->setConnectionCallback(connectionCallback_);
    conn->setMessageCallback(messageCallback_);
    conn->setWriteCompleteCallback(writeCompleteCallback_);//無論是否非空,都可以先設置,在使用之前會有判斷

    conn->setCloseCallback(
            boost::bind(&TcpServer::removeConnection, this, _1));

    // conn->connectEstablished();
    ioLoop->runInLoop(boost::bind(&TcpConnection::connectEstablished, conn));
    //個人理解bind在綁定類成員函數時,后面跟的參數一定比輸入參數多一個,就是一個類指針,表明這個函數屬於那個類變量的,
    //一般都使用this,而這里是用的TcpConnectionPtr
    LOG_TRACE << "[5] usecount=" << conn.use_count();

}

void TcpServer::removeConnection(const TcpConnectionPtr &conn) {
    /*
  loop_->assertInLoopThread();
  LOG_INFO << "TcpServer::removeConnectionInLoop [" << name_
           << "] - connection " << conn->name();


  LOG_TRACE << "[8] usecount=" << conn.use_count();
  size_t n = connections_.erase(conn->name());
  LOG_TRACE << "[9] usecount=" << conn.use_count();

  (void)n;
  assert(n == 1);
  
  loop_->queueInLoop(
      boost::bind(&TcpConnection::connectDestroyed, conn));
  LOG_TRACE << "[10] usecount=" << conn.use_count();
  */

    loop_->runInLoop(boost::bind(&TcpServer::removeConnectionInLoop, this, conn));

}

void TcpServer::removeConnectionInLoop(const TcpConnectionPtr &conn)//就是把TcpConnection從Eventloop中移除
{
    loop_->assertInLoopThread();
    LOG_INFO << "TcpServer::removeConnectionInLoop [" << name_
             << "] - connection " << conn->name();


    LOG_TRACE << "[8] usecount=" << conn.use_count();
    size_t n = connections_.erase(conn->name());
    LOG_TRACE << "[9] usecount=" << conn.use_count();

    (void) n;
    assert(n == 1);

    EventLoop *ioLoop = conn->getLoop();
    ioLoop->queueInLoop(
            boost::bind(&TcpConnection::connectDestroyed, conn));

    //loop_->queueInLoop(
    //    boost::bind(&TcpConnection::connectDestroyed, conn));
    LOG_TRACE << "[10] usecount=" << conn.use_count();


}

 

TcpConnection

TcpConnection類主要負責封裝一次TCP連接,向Channel類注冊回調函數(可讀、可寫、可關閉、錯誤處理),將來當Channel類上的事件發生時,調用相應的回調函數進行數據收發或者錯誤處理。

  TcpConnection是使用shared_ptr來管理的類,因為它的生命周期模糊。TcpConnection表示已經建立或正在建立的連接,建立連接后,用戶只需要在上層類如TcpServer中設置連接到來和消息到來的處理函數,繼而回調TcpConnection中的 setConnectionCallback和setMessageCallback函數,實現對事件的處理。用戶需要關心的事件是有限的,其他都由網絡庫負責。

  TcpConnection中封裝了InputBuffer和OutputBuffer,用來表示應用層的緩沖區。在發送數據時,如果不能一次將Buffer中的數據發送完畢,它還會繼續關注Channel中的可寫事件,當sockfd可寫時,會再次發送。

前面提到TcpConnection的生存期模糊,主要是因為我們不能在TcpServer中直接erase掉TcpConnection對象,因為此時有可能Channel中的handleEvent還在執行,如果析構TcpConnection對象,那么他的成員channel_也會被析構,會導致core dump。也就是說我們需要TcpConnection 對象生存期要長於handleEvent() 函數,直到執行完connectDestroyed() 后才會析構。

 

斷開連接:

TcpConnection的斷開是采用被動方式,即對方先關閉連接,本地read(2)返回0后,調用順序如下:

handleClose()->TcpServer::removeConnection->TcpConnection::connectDestroyed()。

連接關閉時序圖:

        

 

 

 

當連接到來,創建一個TcpConnection對象,立刻用shared_ptr來管理,引用計數為1,在Channel中維護一個weak_ptr(tie_),將這個shared_ptr對象賦值給_tie,引用計數仍然為1。當連接關閉時,在handleEvent中,將tie_提升,得到一個shard_ptr對象,引用計數就變成了2。當shared_ptr的計數不為0時,TcpConnection不會被銷毀。

 TcpConnection.h源碼分析

// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.

// Author: Shuo Chen (chenshuo at chenshuo dot com)
//
// This is a public header file, it must only include public header files.
/*我所理解的這個類,主要用來和buffer類一起作為非阻塞IO的一個讀取橋梁,其中主要封裝的函數是從文件描述符中讀取傳輸的數據到
 *接受緩沖區中,或者把規定數據,或者觸發寫事件的輸出緩沖區的數據寫入對應的文件描述符中。
 */
#ifndef MUDUO_NET_TCPCONNECTION_H
#define MUDUO_NET_TCPCONNECTION_H

#include <muduo/base/Mutex.h>
#include <muduo/base/StringPiece.h>
#include <muduo/base/Types.h>
#include <muduo/net/Callbacks.h>
#include <muduo/net/Buffer.h>
#include <muduo/net/InetAddress.h>

#include <boost/any.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/noncopyable.hpp>
#include <boost/scoped_ptr.hpp>
#include <boost/shared_ptr.hpp>

namespace muduo {
    namespace net {

        class Channel;

        class EventLoop;

        class Socket;

///
/// TCP connection, for both client and server usage.
///
/// This is an interface class, so don't expose too much details.
        class TcpConnection : boost::noncopyable,public boost::enable_shared_from_this<TcpConnection> {
        public:
            /// Constructs a TcpConnection with a connected sockfd
            ///
            /// User should not create this object.
            TcpConnection(EventLoop *loop,const string &name,int sockfd,
                          const InetAddress &localAddr,
                          const InetAddress &peerAddr);

            ~TcpConnection();

            EventLoop *getLoop() const { return loop_; }//獲取當前TcpConnection所在的Eventloop
            const string &name() const { return name_; }//
            const InetAddress &localAddress() { return localAddr_; }

            const InetAddress &peerAddress() { return peerAddr_; }

            bool connected() const { return state_ == kConnected; }

            // void send(string&& message); // C++11
            void send(const void *message, size_t len);

            void send(const StringPiece &message);

            // void send(Buffer&& message); // C++11
            void send(Buffer *message);  // this one will swap data
            void shutdown(); // NOT thread safe, no simultaneous calling
            void setTcpNoDelay(bool on);

            void setContext(const boost::any &context) { context_ = context; }

            const boost::any &getContext() const//得到常數值的context_
            { return context_; }

            boost::any *getMutableContext()//得到可以改變的context_
            { return &context_; }

            void setConnectionCallback(const ConnectionCallback &cb) { connectionCallback_ = cb; }
            //在handleClose,connectEstablished,connectDestroyed中調用,個人理解這個連接回調函數主要起到
            //顯示作用,就是在和連接描述符建立連接或者關閉連接前,顯示連接狀態的,表明還在連接中

            void setMessageCallback(const MessageCallback &cb) { messageCallback_ = cb; }
            //在handleRead函數當中調用了,也可以理解為channel_寫函數的一部分

            void setWriteCompleteCallback(const WriteCompleteCallback &cb) { writeCompleteCallback_ = cb; }
            //在handleWrite和sendInLoop寫函數中,寫完調用的

            void setHighWaterMarkCallback(const HighWaterMarkCallback &cb, size_t highWaterMark) {
                highWaterMarkCallback_ = cb;
                highWaterMark_ = highWaterMark;
            }//都在sendInLoop中調用了

            Buffer *inputBuffer() { return &inputBuffer_; }

            /// Internal use only.
            void setCloseCallback(const CloseCallback &cb) { closeCallback_ = cb; }//在handleClose函數中調用

            // called when TcpServer accepts a new connection
            void connectEstablished();   // should be called only once
            // called when TcpServer has removed me from its map
            void connectDestroyed();  // should be called only once

        private:
            enum StateE {
                kDisconnected, kConnecting, kConnected, kDisconnecting
            };

            void handleRead(Timestamp receiveTime);//綁定channel_的讀函數
            void handleWrite();//綁定channel_的寫函數
            void handleClose();//綁定channel_的關閉函數,同時也在handleRead中調用
            void handleError();////綁定channel_的錯誤函數
            void sendInLoop(const StringPiece &message);

            void sendInLoop(const void *message, size_t len);

            void shutdownInLoop();

            void setState(StateE s) { state_ = s; }//設置狀態位

            EventLoop *loop_;            // 所屬EventLoop
            string name_;                // 連接名
            StateE state_;  // FIXME: use atomic variable
            // we don't expose those classes to client.
            //連接狀態
            boost::scoped_ptr <Socket> socket_;
            boost::scoped_ptr <Channel> channel_;
            //channel_在TCPServer中綁定了連接套接字,就是能夠實現通信的那個connfd套接字,這個套接字是從Socket::accept函數得到的
            //在Tcpclient綁定的是創建的套接字,因為客戶端只需要一個套接字就可以了,這個套接字是從socket()函數中得到的
            InetAddress localAddr_;//當前服務端的地址
            InetAddress peerAddr_;//當前建立連接的客戶端地址
            ConnectionCallback connectionCallback_;
            MessageCallback messageCallback_;
            WriteCompleteCallback writeCompleteCallback_;        // 數據發送完畢回調函數,即所有的用戶數據都已拷貝到內核緩沖區時回調該函數
            // outputBuffer_被清空也會回調該函數,可以理解為低水位標回調函數
            HighWaterMarkCallback highWaterMarkCallback_;        // 高水位標回調函數
            CloseCallback closeCallback_;
            size_t highWaterMark_;        // 高水位標
            Buffer inputBuffer_;            // 應用層接收緩沖區
            Buffer outputBuffer_;            // 應用層發送緩沖區
            boost::any context_;            // 綁定一個未知類型的上下文對象,一般用來放HttpContext類的
        };

        typedef boost::shared_ptr <TcpConnection> TcpConnectionPtr;

    }
}

#endif  // MUDUO_NET_TCPCONNECTION_H

TcpConnection.cc源碼分析
// Copyright 2010, Shuo Chen.  All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.

// Author: Shuo Chen (chenshuo at chenshuo dot com)

#include <muduo/net/TcpConnection.h>

#include <muduo/base/Logging.h>
#include <muduo/net/Channel.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/Socket.h>
#include <muduo/net/SocketsOps.h>

#include <boost/bind.hpp>

#include <errno.h>
#include <stdio.h>

using namespace muduo;
using namespace muduo::net;

void muduo::net::defaultConnectionCallback(const TcpConnectionPtr &conn)//默認的連接回調函數,輸出連接狀態
{
    LOG_TRACE << conn->localAddress().toIpPort() << " -> "
              << conn->peerAddress().toIpPort() << " is "
              << (conn->connected() ? "UP" : "DOWN");
}

void muduo::net::defaultMessageCallback(const TcpConnectionPtr &, Buffer *buf, Timestamp)
//默認的有消息時執行的回調函數,把緩沖區讀指針和寫指針回到初始化的位置
//可以理解為將緩沖區清零
{
    buf->retrieveAll();
}

TcpConnection::TcpConnection(EventLoop *loop, const string &nameArg, int sockfd,
                             const InetAddress &localAddr,
                             const InetAddress &peerAddr)
        : loop_(CHECK_NOTNULL(loop)), // 所屬EventLoop
          name_(nameArg),// 連接名
          state_(kConnecting),//連接狀態
          socket_(new Socket(sockfd)),//連接套接字
          channel_(new Channel(loop, sockfd)),
          //channel_在TCPServer中綁定了連接套接字,就是能夠實現通信的那個connfd套接字,這個套接字是從Socket::accept函數得到的
          //在Tcpclient綁定的是創建的套接字,因為客戶端只需要一個套接字就可以了,這個套接字是從socket()函數中得到的
          localAddr_(localAddr),//當前服務端的地址
          peerAddr_(peerAddr),//當前建立連接的客戶端地址
          highWaterMark_(64 * 1024 * 1024) {
    // 通道可讀事件到來的時候,回調TcpConnection::handleRead,_1是事件發生時間
    channel_->setReadCallback(boost::bind(&TcpConnection::handleRead, this, _1));
    // 通道可寫事件到來的時候,回調TcpConnection::handleWrite
    channel_->setWriteCallback(boost::bind(&TcpConnection::handleWrite, this));
    // 連接關閉,回調TcpConnection::handleClose
    channel_->setCloseCallback(boost::bind(&TcpConnection::handleClose, this));
    // 發生錯誤,回調TcpConnection::handleError
    channel_->setErrorCallback(boost::bind(&TcpConnection::handleError, this));
    LOG_DEBUG << "TcpConnection::ctor[" << name_ << "] at " << this<< " fd=" << sockfd;
    socket_->setKeepAlive(true);//定期探測連接是否存在,類似於心跳包
}

TcpConnection::~TcpConnection() {
    LOG_DEBUG << "TcpConnection::dtor[" << name_ << "] at " << this
              << " fd=" << channel_->fd();
}

// 線程安全,可以跨線程調用
void TcpConnection::send(const void *data, size_t len) {
    if (state_ == kConnected) {
        if (loop_->isInLoopThread()) {
            sendInLoop(data, len);
        } else {
            string message(static_cast<const char *>(data), len);
            loop_->runInLoop(boost::bind(&TcpConnection::sendInLoop,this,message));
        }
    }
}

// 線程安全,可以跨線程調用
void TcpConnection::send(const StringPiece &message) {
    if (state_ == kConnected) {
        if (loop_->isInLoopThread()) {
            sendInLoop(message);
        } else {
            loop_->runInLoop(boost::bind(&TcpConnection::sendInLoop,this,message.as_string()));
            //std::forward<string>(message)));
        }
    }
}

// 線程安全,可以跨線程調用
void TcpConnection::send(Buffer *buf) {
    if (state_ == kConnected) {
        if (loop_->isInLoopThread()) {
            sendInLoop(buf->peek(), buf->readableBytes());
            buf->retrieveAll();
        } else {
            loop_->runInLoop(boost::bind(&TcpConnection::sendInLoop,this,buf->retrieveAllAsString()));
            //std::forward<string>(message)));
        }
    }
}

void TcpConnection::sendInLoop(const StringPiece &message) {
    sendInLoop(message.data(), message.size());
}

//???這個函數和handlewrite函數都是向文件描述符中寫入,有什么區別呢?
void TcpConnection::sendInLoop(const void *data, size_t len) {
    /*
    loop_->assertInLoopThread();
    sockets::write(channel_->fd(), data, len);
    */
    loop_->assertInLoopThread();
    ssize_t nwrote = 0;
    size_t remaining = len;
    bool error = false;
    if (state_ == kDisconnected) {
        LOG_WARN << "disconnected, give up writing";
        return;
    }
    // if no thing in output queue, try writing directly
    // 通道沒有關注可寫事件並且發送緩沖區沒有數據,直接write
    if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0) {
        nwrote = sockets::write(channel_->fd(), data, len);
        if (nwrote >= 0) {
            remaining = len - nwrote;
            // 寫完了,回調writeCompleteCallback_
            if (remaining == 0 && writeCompleteCallback_) {
                loop_->queueInLoop(boost::bind(writeCompleteCallback_, shared_from_this()));
            }
        } else // nwrote < 0
        {
            nwrote = 0;
            if (errno != EWOULDBLOCK) {
                LOG_SYSERR << "TcpConnection::sendInLoop";
                if (errno == EPIPE) // FIXME: any others?
                {
                    error = true;
                }
            }
        }
    }

    assert(remaining <= len);
    // 沒有錯誤,並且還有未寫完的數據(說明內核發送緩沖區滿,要將未寫完的數據添加到output buffer中)
    if (!error && remaining > 0) {
        LOG_TRACE << "I am going to write more data";
        size_t oldLen = outputBuffer_.readableBytes();
        // 如果超過highWaterMark_(高水位標),回調highWaterMarkCallback_
        if (oldLen + remaining >= highWaterMark_
            && oldLen < highWaterMark_
            && highWaterMarkCallback_) {
            loop_->queueInLoop(boost::bind(highWaterMarkCallback_, shared_from_this(), oldLen + remaining));
        }
        outputBuffer_.append(static_cast<const char *>(data) + nwrote, remaining);//將剩余數據存入應用層發送緩沖區
        if (!channel_->isWriting()) {
            channel_->enableWriting();        // 關注POLLOUT事件
        }
    }
}

void TcpConnection::shutdown()//關閉連接
{
    // FIXME: use compare and swap
    if (state_ == kConnected) {
        setState(kDisconnecting);
        // FIXME: shared_from_this()?
        loop_->runInLoop(boost::bind(&TcpConnection::shutdownInLoop, this));
    }
}

void TcpConnection::shutdownInLoop()//在loop中關閉寫半邊,還是可以讀數據
{
    loop_->assertInLoopThread();
    if (!channel_->isWriting()) {
        // we are not writing
        socket_->shutdownWrite();
    }
}

void TcpConnection::setTcpNoDelay(bool on)//設置TCP延遲連接
{
    socket_->setTcpNoDelay(on);
}

void TcpConnection::connectEstablished()//這個建立連接是TcpConnection類中的channel加入到對應的比如Tcpclient或者Tcpserver類所屬的eventloop中
{
    loop_->assertInLoopThread();
    assert(state_ == kConnecting);//設置正在連接狀態
    setState(kConnected);
    LOG_TRACE << "[3] usecount=" << shared_from_this().use_count();
    channel_->tie(shared_from_this());
    channel_->enableReading();    // TcpConnection所對應的通道加入到Poller關注

    connectionCallback_(shared_from_this());
    LOG_TRACE << "[4] usecount=" << shared_from_this().use_count();
}

void TcpConnection::connectDestroyed()//取消連接,從對應的Eventloop上的epoll隊列中去除
{
    loop_->assertInLoopThread();
    if (state_ == kConnected) {
        setState(kDisconnected);
        channel_->disableAll();

        connectionCallback_(shared_from_this());
    }
    channel_->remove();//將channel從epoll隊列中移除
}

void TcpConnection::handleRead(Timestamp receiveTime)//處理讀事件的函數
{
    /*
    loop_->assertInLoopThread();
    int savedErrno = 0;
    ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);
    if (n > 0)
    {
      messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);
    }
    else if (n == 0)
    {
      handleClose();
    }
    else
    {
      errno = savedErrno;
      LOG_SYSERR << "TcpConnection::handleRead";
      handleError();
    }
    */

    /*
    loop_->assertInLoopThread();
    int savedErrno = 0;
    char buf[65536];
    ssize_t n = ::read(channel_->fd(), buf, sizeof buf);
    if (n > 0)
    {
      messageCallback_(shared_from_this(), buf, n);
    }
    else if (n == 0)
    {
      handleClose();
    }
    else
    {
      errno = savedErrno;
      LOG_SYSERR << "TcpConnection::handleRead";
      handleError();
    }
    */
    loop_->assertInLoopThread();
    int savedErrno = 0;
    ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);//直接將數據讀到inputBuffer_緩沖區
    if (n > 0) {
        messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);
    } else if (n == 0) {
        handleClose();//如果讀到的數據為0,就自動退出
    } else {
        errno = savedErrno;
        LOG_SYSERR << "TcpConnection::handleRead";
        handleError();
    }
}

// 監聽到寫事件了,就調用這個函數,此時服務器已經把要寫的內容寫到outputBuffer_中去了,所以要寫的內容從讀指針處開始
void TcpConnection::handleWrite() {
    loop_->assertInLoopThread();
    if (channel_->isWriting())//查看是否有寫事件需要關注
    {
        ssize_t n = sockets::write(channel_->fd(),
                                   outputBuffer_.peek(),
                                   outputBuffer_.readableBytes());//寫到文件描述符中去
        if (n > 0) {
            outputBuffer_.retrieve(n);//處理讀寫指針
            if (outputBuffer_.readableBytes() == 0)     // 發送緩沖區已清空
            {
                channel_->disableWriting();        // 停止關注POLLOUT事件,以免出現busy loop
                if (writeCompleteCallback_)        // 回調writeCompleteCallback_
                {
                    // 應用層發送緩沖區被清空,就回調用writeCompleteCallback_
                    // 發送給IO線程進行處理
                    loop_->queueInLoop(boost::bind(writeCompleteCallback_, shared_from_this()));
                }
                if (state_ == kDisconnecting)    // 發送緩沖區已清空並且連接狀態是kDisconnecting, 要關閉連接
                {
                    shutdownInLoop();        // 關閉寫連接
                }
            } else {
                LOG_TRACE << "I am going to write more data";
            }
        } else {
            LOG_SYSERR << "TcpConnection::handleWrite";
            // if (state_ == kDisconnecting)
            // {
            //   shutdownInLoop();
            // }
        }
    } else {
        LOG_TRACE << "Connection fd = " << channel_->fd()
                  << " is down, no more writing";
    }
}

void TcpConnection::handleClose()//關閉事件處理,也是epoll如果發生關閉事件的回調函數
{
    loop_->assertInLoopThread();
    LOG_TRACE << "fd = " << channel_->fd() << " state = " << state_;
    assert(state_ == kConnected || state_ == kDisconnecting);
    // we don't close fd, leave it to dtor, so we can find leaks easily.
    setState(kDisconnected);
    channel_->disableAll();

    TcpConnectionPtr guardThis(shared_from_this());
    connectionCallback_(guardThis);        // 在結束前,最后一次處理一下,這一行,可以不調用
    LOG_TRACE << "[7] usecount=" << guardThis.use_count();
    // must be the last line
    closeCallback_(guardThis);    // 調用TcpServer::removeConnection
    LOG_TRACE << "[11] usecount=" << guardThis.use_count();
}

void TcpConnection::handleError()//處理錯誤的函數,也是epoll如果發生錯誤事件的回調函數
{
    int err = sockets::getSocketError(channel_->fd());
    LOG_ERROR << "TcpConnection::handleError [" << name_
              << "] - SO_ERROR = " << err << " " << strerror_tl(err);
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM