簡介
TcpConnection類,TcpServer類,Acceptor類是構成非阻塞TCP網絡編程庫的重要組成部分。
本文主要針對muduo源碼進行分析。(Acceptor類在上篇中已經分析過了)
muduo網絡庫的單線程設計方式,即一個EventLoop 處理所有的事件,包括鏈接的建立、IO、計算、以及鏈接的銷毀,多線程的方式即每一個連接一個EventLoop。(one loop per thread)
TcpServer
多線程的muduo::TcpServer,主要通過添加一個EventLoopThreadPool 事件循環線程池實現,新建TcpConnection時從event loop pool里挑選一個loop給TcpConnection用。 也就是說多線程TcpServer自己的EventLoop只用來接受新連接, 而新連接會用其他EventLoop來執行IO。 (單線程TcpServer的EventLoop是與TcpConnection共享的。)
EventLoopThreadPooll 按one loop per thread的思想實現多線程TcpServer, 此時主線程循環只負責TCP鏈接的建立,及任務的分配,需要讓哪個線程干活, 就把timer或IO(如TCP連接) 注冊到那個線程的循環里即可;對實時性有要求的connection可以單獨用一個線程; 數據量大的connection可以獨占一個線程;並把數據處理任務分攤到另幾個計算線程中(用線程池);其他次要的輔助性connections共享一個線程。
線程池設計模式
池是一種設計模式,線程是是一種資源,線程池設計模式通常是事先申請一定的資源,當需要使用時,去資源池中申請資源,用完后再放回資源池中。EventLoopThreadPool 正是借鑒了這種設計模式,雖然使用的線程並不會放回資源池中,但是本身的一個EventLoop即一個Reactor,本身就具備等待機制了。
TcpServer每次新建一條TcpConnection就會通過EventLoopThreadPool::getNextLoop()方法來取一個EventLoop, 目前的getNextLoop()只是循環的從池中取一條loop,如果提供給每條TcpConncetion的是均等的服務,那么這樣就能很均勻的分配系統的資源了。
TcpServer的工作方式取決於EventLoopThreadPool中線程的創建數量。
0 意味着所有的I/O 都在TcpServer的主事件循環中,不會創建新的線程。
1 意味着所有的 I/O 在另一個線程中 ,TcpServer的主線程只負責建立連接。
N 意味着新的連接會被循環的分配到N條線程中工作。
下面是源碼(帶有注釋)
TcpServer.h
// Copyright 2010, Shuo Chen. All rights reserved. // http://code.google.com/p/muduo/ // // Use of this source code is governed by a BSD-style license // that can be found in the License file. // Author: Shuo Chen (chenshuo at chenshuo dot com) // // This is a public header file, it must only include public header files. /* 這個類相當於把TcpConnection以及Accept類整合起來,完全能夠實現Tcp通信,也就是socket函數都實現了 * 總結一下整個TCP通信過程: * 一個TcpServer類中,有Acceptor,EventLoopThreadPool各一個,以及多個TcpConnection類的指針, * 在TcpServer類的啟動函數中,先開啟EventLoopThreadPool線程池,然后將Acceptor監聽函數放入eventloop中去執行 * 在TcpServer類的構造函數中,就已經把一個成功連接的回調函數綁定在Acceptor類的連接回調函數中,如果Acceptor監聽 * 到有連接進來,先調監聽socket描述符的回調函數,把這個連接accept進來,然后再調用newConnectionCallback_函數 * 來處理連接,每個連接都有一個對應的TcpConnection類來作為緩沖區 * */ #ifndef MUDUO_NET_TCPSERVER_H #define MUDUO_NET_TCPSERVER_H #include <muduo/base/Types.h> #include <muduo/net/TcpConnection.h> #include <map> #include <boost/noncopyable.hpp> #include <boost/scoped_ptr.hpp> namespace muduo { namespace net { class Acceptor; class EventLoop; class EventLoopThreadPool; /// /// TCP server, supports single-threaded and thread-pool models. /// /// This is an interface class, so don't expose too much details. class TcpServer : boost::noncopyable { public: typedef boost::function<void(EventLoop *)> ThreadInitCallback; //TcpServer(EventLoop* loop, const InetAddress& listenAddr); TcpServer(EventLoop *loop, const InetAddress &listenAddr, const string &nameArg); ~TcpServer(); // force out-line dtor, for scoped_ptr members. const string &hostport() const { return hostport_; } const string &name() const { return name_; } /// Set the number of threads for handling input. /// /// Always accepts new connection in loop's thread. /// Must be called before @c start /// @param numThreads /// - 0 means all I/O in loop's thread, no thread will created. /// this is the default value. /// - 1 means all I/O in another thread. /// - N means a thread pool with N threads, new connections /// are assigned on a round-robin basis. void setThreadNum(int numThreads); void setThreadInitCallback( const ThreadInitCallback &cb) { threadInitCallback_ = cb; }//這個函數會作為EventLoopThreadPool::start的入口參數 /// Starts the server if it's not listenning. /// /// It's harmless to call it multiple times. /// Thread safe. void start(); /// Set connection callback. /// Not thread safe. // 設置連接到來或者連接關閉回調函數,這個函數指針會賦值給TcpConnection::connectionCallback_函數,就是在連接建立之后,和連接斷開之前會調用 void setConnectionCallback(const ConnectionCallback &cb) { connectionCallback_ = cb; } /// Set message callback. /// Not thread safe. // 設置消息到來回調函數,這個函數指針在TcpConnection::handleread函數中調用,也就是TcpConnection的Channel的讀函數的一部分 void setMessageCallback(const MessageCallback &cb) { messageCallback_ = cb; } /// Set write complete callback. /// Not thread safe. /// 在發送完消息以后調用,這個函數指針會賦值給TcpConnection::writeCompleteCallback_函數 void setWriteCompleteCallback(const WriteCompleteCallback &cb) { writeCompleteCallback_ = cb; } private: /// Not thread safe, but in loop void newConnection(int sockfd, const InetAddress &peerAddr); //這個函數會賦值給Acceptor::newConnectionCallback_,在新連接建立以后調用 /// Thread safe. /// 會賦值給TcpConnection::closeCallback_函數,也就是當連接描述符關閉以后調用這個 void removeConnection(const TcpConnectionPtr &conn); /// Not thread safe, but in loop,在上面這個函數removeConnection中調用 void removeConnectionInLoop(const TcpConnectionPtr &conn); typedef std::map <string, TcpConnectionPtr> ConnectionMap; EventLoop *loop_; // the acceptor loop const string hostport_; // 服務的ip:端口 const string name_; // 服務名 boost::scoped_ptr <Acceptor> acceptor_; // avoid revealing Acceptor boost::scoped_ptr <EventLoopThreadPool> threadPool_; ConnectionCallback connectionCallback_; MessageCallback messageCallback_; WriteCompleteCallback writeCompleteCallback_; // 數據發送完畢,會回調此函數 ThreadInitCallback threadInitCallback_; // IO線程池中的線程在進入事件循環前,會回調用此函數 bool started_; // always in loop thread int nextConnId_; // 下一個連接ID,每次增加一個就加1 ConnectionMap connections_; // 連接列表 }; } } #endif // MUDUO_NET_TCPSERVER_H
TcpServer.cc
// Copyright 2010, Shuo Chen. All rights reserved. // http://code.google.com/p/muduo/ // // Use of this source code is governed by a BSD-style license // that can be found in the License file. // Author: Shuo Chen (chenshuo at chenshuo dot com) #include <muduo/net/TcpServer.h> #include <muduo/base/Logging.h> #include <muduo/net/Acceptor.h> #include <muduo/net/EventLoop.h> #include <muduo/net/EventLoopThreadPool.h> #include <muduo/net/SocketsOps.h> #include <boost/bind.hpp> #include <stdio.h> // snprintf using namespace muduo; using namespace muduo::net; TcpServer::TcpServer(EventLoop *loop, const InetAddress &listenAddr, const string &nameArg) : loop_(CHECK_NOTNULL(loop)), hostport_(listenAddr.toIpPort()), name_(nameArg), acceptor_(new Acceptor(loop, listenAddr)), threadPool_(new EventLoopThreadPool(loop)), connectionCallback_(defaultConnectionCallback), messageCallback_(defaultMessageCallback), started_(false), nextConnId_(1) { // Acceptor::handleRead函數中會回調用TcpServer::newConnection // _1對應的是socket文件描述符,_2對應的是對等方的地址(InetAddress) acceptor_->setNewConnectionCallback( boost::bind(&TcpServer::newConnection, this, _1, _2)); } TcpServer::~TcpServer() { loop_->assertInLoopThread(); LOG_TRACE << "TcpServer::~TcpServer [" << name_ << "] destructing"; for (ConnectionMap::iterator it(connections_.begin()); it != connections_.end(); ++it) { TcpConnectionPtr conn = it->second; it->second.reset(); // 釋放當前所控制的對象,引用計數減一 conn->getLoop()->runInLoop( boost::bind(&TcpConnection::connectDestroyed, conn)); conn.reset(); // 釋放當前所控制的對象,引用計數減一 } } void TcpServer::setThreadNum(int numThreads) { assert(0 <= numThreads); threadPool_->setThreadNum(numThreads); } // 該函數多次調用是無害的 // 該函數可以跨線程調用 void TcpServer::start() { if (!started_) { started_ = true; threadPool_->start(threadInitCallback_); } if (!acceptor_->listenning()) { // get_pointer返回原生指針 loop_->runInLoop( boost::bind(&Acceptor::listen, get_pointer(acceptor_))); } } void TcpServer::newConnection(int sockfd, const InetAddress &peerAddr)//建立新連接以后的回調函數 { loop_->assertInLoopThread(); // 按照輪叫的方式選擇一個EventLoop EventLoop *ioLoop = threadPool_->getNextLoop(); char buf[32]; snprintf(buf, sizeof buf, ":%s#%d", hostport_.c_str(), nextConnId_);//buf的內容是 ip:端口#nextConnId_ ++nextConnId_; string connName = name_ + buf; LOG_INFO << "TcpServer::newConnection [" << name_ << "] - new connection [" << connName << "] from " << peerAddr.toIpPort(); InetAddress localAddr(sockets::getLocalAddr(sockfd)); // FIXME poll with zero timeout to double confirm the new connection // FIXME use make_shared if necessary /*TcpConnectionPtr conn(new TcpConnection(loop_, connName, sockfd, localAddr, peerAddr));*/ TcpConnectionPtr conn(new TcpConnection(ioLoop, connName, sockfd, localAddr, peerAddr)); LOG_TRACE << "[1] usecount=" << conn.use_count(); connections_[connName] = conn;//將連接名和TCPConnection的指針拷貝進連接列表中,這樣就有兩個shared_ptr指針指向conn了, //如果沒有這一句程序,這個conn在newConnection函數執行結束以后就會析構掉,所以真正要刪除時,也要把這個列表中的對應元素也刪除了。 LOG_TRACE << "[2] usecount=" << conn.use_count(); //設置回調函數 conn->setConnectionCallback(connectionCallback_); conn->setMessageCallback(messageCallback_); conn->setWriteCompleteCallback(writeCompleteCallback_);//無論是否非空,都可以先設置,在使用之前會有判斷 conn->setCloseCallback( boost::bind(&TcpServer::removeConnection, this, _1)); // conn->connectEstablished(); ioLoop->runInLoop(boost::bind(&TcpConnection::connectEstablished, conn)); //個人理解bind在綁定類成員函數時,后面跟的參數一定比輸入參數多一個,就是一個類指針,表明這個函數屬於那個類變量的, //一般都使用this,而這里是用的TcpConnectionPtr LOG_TRACE << "[5] usecount=" << conn.use_count(); } void TcpServer::removeConnection(const TcpConnectionPtr &conn) { /* loop_->assertInLoopThread(); LOG_INFO << "TcpServer::removeConnectionInLoop [" << name_ << "] - connection " << conn->name(); LOG_TRACE << "[8] usecount=" << conn.use_count(); size_t n = connections_.erase(conn->name()); LOG_TRACE << "[9] usecount=" << conn.use_count(); (void)n; assert(n == 1); loop_->queueInLoop( boost::bind(&TcpConnection::connectDestroyed, conn)); LOG_TRACE << "[10] usecount=" << conn.use_count(); */ loop_->runInLoop(boost::bind(&TcpServer::removeConnectionInLoop, this, conn)); } void TcpServer::removeConnectionInLoop(const TcpConnectionPtr &conn)//就是把TcpConnection從Eventloop中移除 { loop_->assertInLoopThread(); LOG_INFO << "TcpServer::removeConnectionInLoop [" << name_ << "] - connection " << conn->name(); LOG_TRACE << "[8] usecount=" << conn.use_count(); size_t n = connections_.erase(conn->name()); LOG_TRACE << "[9] usecount=" << conn.use_count(); (void) n; assert(n == 1); EventLoop *ioLoop = conn->getLoop(); ioLoop->queueInLoop( boost::bind(&TcpConnection::connectDestroyed, conn)); //loop_->queueInLoop( // boost::bind(&TcpConnection::connectDestroyed, conn)); LOG_TRACE << "[10] usecount=" << conn.use_count(); }
TcpConnection
TcpConnection類主要負責封裝一次TCP連接,向Channel類注冊回調函數(可讀、可寫、可關閉、錯誤處理),將來當Channel類上的事件發生時,調用相應的回調函數進行數據收發或者錯誤處理。
TcpConnection是使用shared_ptr來管理的類,因為它的生命周期模糊。TcpConnection表示已經建立或正在建立的連接,建立連接后,用戶只需要在上層類如TcpServer中設置連接到來和消息到來的處理函數,繼而回調TcpConnection中的 setConnectionCallback和setMessageCallback函數,實現對事件的處理。用戶需要關心的事件是有限的,其他都由網絡庫負責。
TcpConnection中封裝了InputBuffer和OutputBuffer,用來表示應用層的緩沖區。在發送數據時,如果不能一次將Buffer中的數據發送完畢,它還會繼續關注Channel中的可寫事件,當sockfd可寫時,會再次發送。
前面提到TcpConnection的生存期模糊,主要是因為我們不能在TcpServer中直接erase掉TcpConnection對象,因為此時有可能Channel中的handleEvent還在執行,如果析構TcpConnection對象,那么他的成員channel_也會被析構,會導致core dump。也就是說我們需要TcpConnection 對象生存期要長於handleEvent() 函數,直到執行完connectDestroyed() 后才會析構。
斷開連接:
TcpConnection的斷開是采用被動方式,即對方先關閉連接,本地read(2)返回0后,調用順序如下:
handleClose()->TcpServer::removeConnection->TcpConnection::connectDestroyed()。
連接關閉時序圖:

當連接到來,創建一個TcpConnection對象,立刻用shared_ptr來管理,引用計數為1,在Channel中維護一個weak_ptr(tie_),將這個shared_ptr對象賦值給_tie,引用計數仍然為1。當連接關閉時,在handleEvent中,將tie_提升,得到一個shard_ptr對象,引用計數就變成了2。當shared_ptr的計數不為0時,TcpConnection不會被銷毀。
TcpConnection.h源碼分析
// Copyright 2010, Shuo Chen. All rights reserved. // http://code.google.com/p/muduo/ // // Use of this source code is governed by a BSD-style license // that can be found in the License file. // Author: Shuo Chen (chenshuo at chenshuo dot com) // // This is a public header file, it must only include public header files. /*我所理解的這個類,主要用來和buffer類一起作為非阻塞IO的一個讀取橋梁,其中主要封裝的函數是從文件描述符中讀取傳輸的數據到 *接受緩沖區中,或者把規定數據,或者觸發寫事件的輸出緩沖區的數據寫入對應的文件描述符中。 */ #ifndef MUDUO_NET_TCPCONNECTION_H #define MUDUO_NET_TCPCONNECTION_H #include <muduo/base/Mutex.h> #include <muduo/base/StringPiece.h> #include <muduo/base/Types.h> #include <muduo/net/Callbacks.h> #include <muduo/net/Buffer.h> #include <muduo/net/InetAddress.h> #include <boost/any.hpp> #include <boost/enable_shared_from_this.hpp> #include <boost/noncopyable.hpp> #include <boost/scoped_ptr.hpp> #include <boost/shared_ptr.hpp> namespace muduo { namespace net { class Channel; class EventLoop; class Socket; /// /// TCP connection, for both client and server usage. /// /// This is an interface class, so don't expose too much details. class TcpConnection : boost::noncopyable,public boost::enable_shared_from_this<TcpConnection> { public: /// Constructs a TcpConnection with a connected sockfd /// /// User should not create this object. TcpConnection(EventLoop *loop,const string &name,int sockfd, const InetAddress &localAddr, const InetAddress &peerAddr); ~TcpConnection(); EventLoop *getLoop() const { return loop_; }//獲取當前TcpConnection所在的Eventloop const string &name() const { return name_; }// const InetAddress &localAddress() { return localAddr_; } const InetAddress &peerAddress() { return peerAddr_; } bool connected() const { return state_ == kConnected; } // void send(string&& message); // C++11 void send(const void *message, size_t len); void send(const StringPiece &message); // void send(Buffer&& message); // C++11 void send(Buffer *message); // this one will swap data void shutdown(); // NOT thread safe, no simultaneous calling void setTcpNoDelay(bool on); void setContext(const boost::any &context) { context_ = context; } const boost::any &getContext() const//得到常數值的context_ { return context_; } boost::any *getMutableContext()//得到可以改變的context_ { return &context_; } void setConnectionCallback(const ConnectionCallback &cb) { connectionCallback_ = cb; } //在handleClose,connectEstablished,connectDestroyed中調用,個人理解這個連接回調函數主要起到 //顯示作用,就是在和連接描述符建立連接或者關閉連接前,顯示連接狀態的,表明還在連接中 void setMessageCallback(const MessageCallback &cb) { messageCallback_ = cb; } //在handleRead函數當中調用了,也可以理解為channel_寫函數的一部分 void setWriteCompleteCallback(const WriteCompleteCallback &cb) { writeCompleteCallback_ = cb; } //在handleWrite和sendInLoop寫函數中,寫完調用的 void setHighWaterMarkCallback(const HighWaterMarkCallback &cb, size_t highWaterMark) { highWaterMarkCallback_ = cb; highWaterMark_ = highWaterMark; }//都在sendInLoop中調用了 Buffer *inputBuffer() { return &inputBuffer_; } /// Internal use only. void setCloseCallback(const CloseCallback &cb) { closeCallback_ = cb; }//在handleClose函數中調用 // called when TcpServer accepts a new connection void connectEstablished(); // should be called only once // called when TcpServer has removed me from its map void connectDestroyed(); // should be called only once private: enum StateE { kDisconnected, kConnecting, kConnected, kDisconnecting }; void handleRead(Timestamp receiveTime);//綁定channel_的讀函數 void handleWrite();//綁定channel_的寫函數 void handleClose();//綁定channel_的關閉函數,同時也在handleRead中調用 void handleError();////綁定channel_的錯誤函數 void sendInLoop(const StringPiece &message); void sendInLoop(const void *message, size_t len); void shutdownInLoop(); void setState(StateE s) { state_ = s; }//設置狀態位 EventLoop *loop_; // 所屬EventLoop string name_; // 連接名 StateE state_; // FIXME: use atomic variable // we don't expose those classes to client. //連接狀態 boost::scoped_ptr <Socket> socket_; boost::scoped_ptr <Channel> channel_; //channel_在TCPServer中綁定了連接套接字,就是能夠實現通信的那個connfd套接字,這個套接字是從Socket::accept函數得到的 //在Tcpclient綁定的是創建的套接字,因為客戶端只需要一個套接字就可以了,這個套接字是從socket()函數中得到的 InetAddress localAddr_;//當前服務端的地址 InetAddress peerAddr_;//當前建立連接的客戶端地址 ConnectionCallback connectionCallback_; MessageCallback messageCallback_; WriteCompleteCallback writeCompleteCallback_; // 數據發送完畢回調函數,即所有的用戶數據都已拷貝到內核緩沖區時回調該函數 // outputBuffer_被清空也會回調該函數,可以理解為低水位標回調函數 HighWaterMarkCallback highWaterMarkCallback_; // 高水位標回調函數 CloseCallback closeCallback_; size_t highWaterMark_; // 高水位標 Buffer inputBuffer_; // 應用層接收緩沖區 Buffer outputBuffer_; // 應用層發送緩沖區 boost::any context_; // 綁定一個未知類型的上下文對象,一般用來放HttpContext類的 }; typedef boost::shared_ptr <TcpConnection> TcpConnectionPtr; } } #endif // MUDUO_NET_TCPCONNECTION_H
TcpConnection.cc源碼分析
// Copyright 2010, Shuo Chen. All rights reserved. // http://code.google.com/p/muduo/ // // Use of this source code is governed by a BSD-style license // that can be found in the License file. // Author: Shuo Chen (chenshuo at chenshuo dot com) #include <muduo/net/TcpConnection.h> #include <muduo/base/Logging.h> #include <muduo/net/Channel.h> #include <muduo/net/EventLoop.h> #include <muduo/net/Socket.h> #include <muduo/net/SocketsOps.h> #include <boost/bind.hpp> #include <errno.h> #include <stdio.h> using namespace muduo; using namespace muduo::net; void muduo::net::defaultConnectionCallback(const TcpConnectionPtr &conn)//默認的連接回調函數,輸出連接狀態 { LOG_TRACE << conn->localAddress().toIpPort() << " -> " << conn->peerAddress().toIpPort() << " is " << (conn->connected() ? "UP" : "DOWN"); } void muduo::net::defaultMessageCallback(const TcpConnectionPtr &, Buffer *buf, Timestamp) //默認的有消息時執行的回調函數,把緩沖區讀指針和寫指針回到初始化的位置 //可以理解為將緩沖區清零 { buf->retrieveAll(); } TcpConnection::TcpConnection(EventLoop *loop, const string &nameArg, int sockfd, const InetAddress &localAddr, const InetAddress &peerAddr) : loop_(CHECK_NOTNULL(loop)), // 所屬EventLoop name_(nameArg),// 連接名 state_(kConnecting),//連接狀態 socket_(new Socket(sockfd)),//連接套接字 channel_(new Channel(loop, sockfd)), //channel_在TCPServer中綁定了連接套接字,就是能夠實現通信的那個connfd套接字,這個套接字是從Socket::accept函數得到的 //在Tcpclient綁定的是創建的套接字,因為客戶端只需要一個套接字就可以了,這個套接字是從socket()函數中得到的 localAddr_(localAddr),//當前服務端的地址 peerAddr_(peerAddr),//當前建立連接的客戶端地址 highWaterMark_(64 * 1024 * 1024) { // 通道可讀事件到來的時候,回調TcpConnection::handleRead,_1是事件發生時間 channel_->setReadCallback(boost::bind(&TcpConnection::handleRead, this, _1)); // 通道可寫事件到來的時候,回調TcpConnection::handleWrite channel_->setWriteCallback(boost::bind(&TcpConnection::handleWrite, this)); // 連接關閉,回調TcpConnection::handleClose channel_->setCloseCallback(boost::bind(&TcpConnection::handleClose, this)); // 發生錯誤,回調TcpConnection::handleError channel_->setErrorCallback(boost::bind(&TcpConnection::handleError, this)); LOG_DEBUG << "TcpConnection::ctor[" << name_ << "] at " << this<< " fd=" << sockfd; socket_->setKeepAlive(true);//定期探測連接是否存在,類似於心跳包 } TcpConnection::~TcpConnection() { LOG_DEBUG << "TcpConnection::dtor[" << name_ << "] at " << this << " fd=" << channel_->fd(); } // 線程安全,可以跨線程調用 void TcpConnection::send(const void *data, size_t len) { if (state_ == kConnected) { if (loop_->isInLoopThread()) { sendInLoop(data, len); } else { string message(static_cast<const char *>(data), len); loop_->runInLoop(boost::bind(&TcpConnection::sendInLoop,this,message)); } } } // 線程安全,可以跨線程調用 void TcpConnection::send(const StringPiece &message) { if (state_ == kConnected) { if (loop_->isInLoopThread()) { sendInLoop(message); } else { loop_->runInLoop(boost::bind(&TcpConnection::sendInLoop,this,message.as_string())); //std::forward<string>(message))); } } } // 線程安全,可以跨線程調用 void TcpConnection::send(Buffer *buf) { if (state_ == kConnected) { if (loop_->isInLoopThread()) { sendInLoop(buf->peek(), buf->readableBytes()); buf->retrieveAll(); } else { loop_->runInLoop(boost::bind(&TcpConnection::sendInLoop,this,buf->retrieveAllAsString())); //std::forward<string>(message))); } } } void TcpConnection::sendInLoop(const StringPiece &message) { sendInLoop(message.data(), message.size()); } //???這個函數和handlewrite函數都是向文件描述符中寫入,有什么區別呢? void TcpConnection::sendInLoop(const void *data, size_t len) { /* loop_->assertInLoopThread(); sockets::write(channel_->fd(), data, len); */ loop_->assertInLoopThread(); ssize_t nwrote = 0; size_t remaining = len; bool error = false; if (state_ == kDisconnected) { LOG_WARN << "disconnected, give up writing"; return; } // if no thing in output queue, try writing directly // 通道沒有關注可寫事件並且發送緩沖區沒有數據,直接write if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0) { nwrote = sockets::write(channel_->fd(), data, len); if (nwrote >= 0) { remaining = len - nwrote; // 寫完了,回調writeCompleteCallback_ if (remaining == 0 && writeCompleteCallback_) { loop_->queueInLoop(boost::bind(writeCompleteCallback_, shared_from_this())); } } else // nwrote < 0 { nwrote = 0; if (errno != EWOULDBLOCK) { LOG_SYSERR << "TcpConnection::sendInLoop"; if (errno == EPIPE) // FIXME: any others? { error = true; } } } } assert(remaining <= len); // 沒有錯誤,並且還有未寫完的數據(說明內核發送緩沖區滿,要將未寫完的數據添加到output buffer中) if (!error && remaining > 0) { LOG_TRACE << "I am going to write more data"; size_t oldLen = outputBuffer_.readableBytes(); // 如果超過highWaterMark_(高水位標),回調highWaterMarkCallback_ if (oldLen + remaining >= highWaterMark_ && oldLen < highWaterMark_ && highWaterMarkCallback_) { loop_->queueInLoop(boost::bind(highWaterMarkCallback_, shared_from_this(), oldLen + remaining)); } outputBuffer_.append(static_cast<const char *>(data) + nwrote, remaining);//將剩余數據存入應用層發送緩沖區 if (!channel_->isWriting()) { channel_->enableWriting(); // 關注POLLOUT事件 } } } void TcpConnection::shutdown()//關閉連接 { // FIXME: use compare and swap if (state_ == kConnected) { setState(kDisconnecting); // FIXME: shared_from_this()? loop_->runInLoop(boost::bind(&TcpConnection::shutdownInLoop, this)); } } void TcpConnection::shutdownInLoop()//在loop中關閉寫半邊,還是可以讀數據 { loop_->assertInLoopThread(); if (!channel_->isWriting()) { // we are not writing socket_->shutdownWrite(); } } void TcpConnection::setTcpNoDelay(bool on)//設置TCP延遲連接 { socket_->setTcpNoDelay(on); } void TcpConnection::connectEstablished()//這個建立連接是TcpConnection類中的channel加入到對應的比如Tcpclient或者Tcpserver類所屬的eventloop中 { loop_->assertInLoopThread(); assert(state_ == kConnecting);//設置正在連接狀態 setState(kConnected); LOG_TRACE << "[3] usecount=" << shared_from_this().use_count(); channel_->tie(shared_from_this()); channel_->enableReading(); // TcpConnection所對應的通道加入到Poller關注 connectionCallback_(shared_from_this()); LOG_TRACE << "[4] usecount=" << shared_from_this().use_count(); } void TcpConnection::connectDestroyed()//取消連接,從對應的Eventloop上的epoll隊列中去除 { loop_->assertInLoopThread(); if (state_ == kConnected) { setState(kDisconnected); channel_->disableAll(); connectionCallback_(shared_from_this()); } channel_->remove();//將channel從epoll隊列中移除 } void TcpConnection::handleRead(Timestamp receiveTime)//處理讀事件的函數 { /* loop_->assertInLoopThread(); int savedErrno = 0; ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno); if (n > 0) { messageCallback_(shared_from_this(), &inputBuffer_, receiveTime); } else if (n == 0) { handleClose(); } else { errno = savedErrno; LOG_SYSERR << "TcpConnection::handleRead"; handleError(); } */ /* loop_->assertInLoopThread(); int savedErrno = 0; char buf[65536]; ssize_t n = ::read(channel_->fd(), buf, sizeof buf); if (n > 0) { messageCallback_(shared_from_this(), buf, n); } else if (n == 0) { handleClose(); } else { errno = savedErrno; LOG_SYSERR << "TcpConnection::handleRead"; handleError(); } */ loop_->assertInLoopThread(); int savedErrno = 0; ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);//直接將數據讀到inputBuffer_緩沖區 if (n > 0) { messageCallback_(shared_from_this(), &inputBuffer_, receiveTime); } else if (n == 0) { handleClose();//如果讀到的數據為0,就自動退出 } else { errno = savedErrno; LOG_SYSERR << "TcpConnection::handleRead"; handleError(); } } // 監聽到寫事件了,就調用這個函數,此時服務器已經把要寫的內容寫到outputBuffer_中去了,所以要寫的內容從讀指針處開始 void TcpConnection::handleWrite() { loop_->assertInLoopThread(); if (channel_->isWriting())//查看是否有寫事件需要關注 { ssize_t n = sockets::write(channel_->fd(), outputBuffer_.peek(), outputBuffer_.readableBytes());//寫到文件描述符中去 if (n > 0) { outputBuffer_.retrieve(n);//處理讀寫指針 if (outputBuffer_.readableBytes() == 0) // 發送緩沖區已清空 { channel_->disableWriting(); // 停止關注POLLOUT事件,以免出現busy loop if (writeCompleteCallback_) // 回調writeCompleteCallback_ { // 應用層發送緩沖區被清空,就回調用writeCompleteCallback_ // 發送給IO線程進行處理 loop_->queueInLoop(boost::bind(writeCompleteCallback_, shared_from_this())); } if (state_ == kDisconnecting) // 發送緩沖區已清空並且連接狀態是kDisconnecting, 要關閉連接 { shutdownInLoop(); // 關閉寫連接 } } else { LOG_TRACE << "I am going to write more data"; } } else { LOG_SYSERR << "TcpConnection::handleWrite"; // if (state_ == kDisconnecting) // { // shutdownInLoop(); // } } } else { LOG_TRACE << "Connection fd = " << channel_->fd() << " is down, no more writing"; } } void TcpConnection::handleClose()//關閉事件處理,也是epoll如果發生關閉事件的回調函數 { loop_->assertInLoopThread(); LOG_TRACE << "fd = " << channel_->fd() << " state = " << state_; assert(state_ == kConnected || state_ == kDisconnecting); // we don't close fd, leave it to dtor, so we can find leaks easily. setState(kDisconnected); channel_->disableAll(); TcpConnectionPtr guardThis(shared_from_this()); connectionCallback_(guardThis); // 在結束前,最后一次處理一下,這一行,可以不調用 LOG_TRACE << "[7] usecount=" << guardThis.use_count(); // must be the last line closeCallback_(guardThis); // 調用TcpServer::removeConnection LOG_TRACE << "[11] usecount=" << guardThis.use_count(); } void TcpConnection::handleError()//處理錯誤的函數,也是epoll如果發生錯誤事件的回調函數 { int err = sockets::getSocketError(channel_->fd()); LOG_ERROR << "TcpConnection::handleError [" << name_ << "] - SO_ERROR = " << err << " " << strerror_tl(err); }
