Envoy 源碼分析--network
申明:本文的 Envoy 源碼分析基於 Envoy1.10.0。
Envoy
的服務是通用服務,因此它需要支持 TCP
和 UDP
,同時還需支持 IPV4
和 IPV6
兩種網絡協議,所以網絡模塊有點復雜。本次分析的網絡模塊是底層的模塊,沒有一整個服務的啟動流程,有的地方可能還串不起來。現在先來看下UML類圖:
類圖看上去略顯復雜,主要分為4塊:addres
,socket
,listen
和 connection
。
address
是地址相關的,主要包括IPV4
,IPV6
,PIPE
,DNS
和cidr
。socket
是socket
相關的操作,主要包括ListenSocket
,ConnectionSocket
,TransportSocket
以及option
。listen
是網絡監聽操作,包括TCP
監聽和UDP
監聽。connection
是連接相關操作。關於 L3/4 過濾的這次暫時不分析,后續再講。
address
InstanceBase
繼承自 Instance
是所有地址類型的基類。Ipv4Instance
,Ipv6Instance
和 PipeInstance
三個地址類都是繼承 InstanceBase
。 DNS
解析類使用 c-ares
庫,DnsResolverImpl
只是對 c-ares
的進一步封裝。 CidrRange
是對 cidr 操作相關。
系統操作返回的值和錯誤信息封裝成一個公用的結構體。具體如下:
template <typename T> struct SysCallResult {
//系統返回值
T rc_;
//系統返回的錯誤信息
int errno_;
};
Instance
Ipv4Instance
,Ipv6Instance
和 PipeInstance
三個地址類都是繼承 InstanceBase
。它們的實現基本都差不多,socket()
、bind()
和 connect()
這三個基礎操作都屬於它們的成員。現在我們主要來看下 Ipv4Instance
幾個主要的操作(其它兩個類類似就不再分析)。
Ipv4Instance
的類里有個私有結構體 IpHelper
。這結構體封裝着 IPV4
地址的具體內容,比如端口,版本等
struct IpHelper : public Ip {
const std::string& addressAsString() const override { return friendly_address_; }
bool isAnyAddress() const override { return ipv4_.address_.sin_addr.s_addr == INADDR_ANY; }
bool isUnicastAddress() const override {
return !isAnyAddress() && (ipv4_.address_.sin_addr.s_addr != INADDR_BROADCAST) &&
// inlined IN_MULTICAST() to avoid byte swapping
!((ipv4_.address_.sin_addr.s_addr & htonl(0xf0000000)) == htonl(0xe0000000));
}
const Ipv4* ipv4() const override { return &ipv4_; }
const Ipv6* ipv6() const override { return nullptr; }
uint32_t port() const override { return ntohs(ipv4_.address_.sin_port); }
IpVersion version() const override { return IpVersion::v4; }
Ipv4Helper ipv4_;
std::string friendly_address_;
};
bind()
,socket()
和 connect()
基本都是直接調的底層函數。
Api::SysCallIntResult Ipv6Instance::bind(int fd) const {
const int rc = ::bind(fd, reinterpret_cast<const sockaddr*>(&ip_.ipv6_.address_),
sizeof(ip_.ipv6_.address_));
return {rc, errno};
}
Api::SysCallIntResult Ipv6Instance::connect(int fd) const {
const int rc = ::connect(fd, reinterpret_cast<const sockaddr*>(&ip_.ipv6_.address_),
sizeof(ip_.ipv6_.address_));
return {rc, errno};
}
DNS
DNS
使用 c-ares 作為底層庫。 c-ares
是個 c 實現的異步 DNS 解析庫,很多知名軟件(curl,Nodejs,gevent 等)都使用了該庫。
c-ares
在構造函數內初始化庫,初始化上下文,然后設置 DNS 服務器。
DnsResolverImpl::DnsResolverImpl(
Event::Dispatcher& dispatcher,
const std::vector<Network::Address::InstanceConstSharedPtr>& resolvers)
: dispatcher_(dispatcher),
timer_(dispatcher.createTimer([this] { onEventCallback(ARES_SOCKET_BAD, 0); })) {
//初始化庫
ares_library_init(ARES_LIB_INIT_ALL);
ares_options options;
//初始化上下文
initializeChannel(&options, 0);
... ...
const std::string resolvers_csv = StringUtil::join(resolver_addrs, ",");
//設置 DNS 服務器
int result = ares_set_servers_ports_csv(channel_, resolvers_csv.c_str());
}
使用時直接 resolve()
結果返回在 callback 里。
ActiveDnsQuery* DnsResolverImpl::resolve(const std::string& dns_name,
DnsLookupFamily dns_lookup_family, ResolveCb callback) {
... ...
if (dns_lookup_family == DnsLookupFamily::V4Only) {
pending_resolution->getHostByName(AF_INET);
} else {
pending_resolution->getHostByName(AF_INET6);
}
... ...
}
void DnsResolverImpl::PendingResolution::getHostByName(int family) {
ares_gethostbyname(channel_, dns_name_.c_str(), family,
[](void* arg, int status, int timeouts, hostent* hostent) {
static_cast<PendingResolution*>(arg)->onAresHostCallback(status, timeouts, hostent);
},
this);
}
void DnsResolverImpl::PendingResolution::onAresHostCallback(int status, int timeouts, hostent* hostent) {
... ...
//解析內容加入address_list
std::list<Address::InstanceConstSharedPtr> address_list;
if (status == ARES_SUCCESS) {
if (hostent->h_addrtype == AF_INET) {
for (int i = 0; hostent->h_addr_list[i] != nullptr; ++i) {
ASSERT(hostent->h_length == sizeof(in_addr));
sockaddr_in address;
memset(&address, 0, sizeof(address));
address.sin_family = AF_INET;
address.sin_port = 0;
address.sin_addr = *reinterpret_cast<in_addr*>(hostent->h_addr_list[i]);
address_list.emplace_back(new Address::Ipv4Instance(&address));
}
... ...
}
if (completed_) {
if (!cancelled_) {
try {
//調用回調
callback_(std::move(address_list));
} catch (const EnvoyException& e) {
... ...
}
cidr
cidr
的定義是形如 192.168.0.1/24 的 IP 段。想知道具體的定義和 IP 段 可看 cidr。
CidrRange
將 cidr
拆分成兩字段地址和長度。下面是判斷地址是否屬於這個 IP 段。
bool CidrRange::isInRange(const Instance& address) const {
... ...
//長度為0,全匹配(length_初始值為-1)
if (length_ == 0) {
return true;
}
switch (address.ip()->version()) {
case IpVersion::v4:
if (ntohl(address.ip()->ipv4()->address()) >> (32 - length_) ==
ntohl(address_->ip()->ipv4()->address()) >> (32 - length_)) {
return true;
}
break;
case IpVersion::v6:
if ((Utility::Ip6ntohl(address_->ip()->ipv6()->address()) >> (128 - length_)) ==
(Utility::Ip6ntohl(address.ip()->ipv6()->address()) >> (128 - length_))) {
return true;
}
break;
}
return false;
}
socket
我們都知道,創建 TCP 服務時,監聽的 fd 和連接的 fd 是不一樣的,因此 socket
分為 ListenSocket
和 ConnectionSocket
。socket
里有很多的配置(比如讀超時,寫超時等)都是調用setsockopt
,所有需要一個 Option
來進行統一的封裝。
Option
Option
是對 setsockopt
這個函數操作的封裝。封裝后再用智能指針的方式進行操作。
typedef std::shared_ptr<const Option> OptionConstSharedPtr;
typedef std::vector<OptionConstSharedPtr> Options;
typedef std::shared_ptr<Options> OptionsSharedPtr;
Option
在全部設置完后,在 applyOptions
后,最終還是調用 setsockopt
。
static bool applyOptions(const OptionsSharedPtr& options, Socket& socket,
envoy::api::v2::core::SocketOption::SocketState state) {
if (options == nullptr) {
return true;
}
for (const auto& option : *options) {
//對所有的option 進行設置
if (!option->setOption(socket, state)) {
return false;
}
}
return true;
}
bool SocketOptionImpl::setOption(Socket& socket,
envoy::api::v2::core::SocketOption::SocketState state) const {
if (in_state_ == state) {
//調用成員函數 setSocketOption
const Api::SysCallIntResult result = SocketOptionImpl::setSocketOption(socket, optname_, value_);
... ...
return true;
}
Api::SysCallIntResult SocketOptionImpl::setSocketOption(Socket& socket, Network::SocketOptionName optname, const absl::string_view value) {
... ...
//最終調用系統函數setsockopt
return os_syscalls.setsockopt(socket.ioHandle().fd(), optname.value().first,
optname.value().second, value.data(), value.size());
}
Socket
Socket
提供基本的 socket 操作。主要是 'Option' 操作(上面已分析過)和地址操作。代碼比較簡單。
//設置和獲取本地地址
const Address::InstanceConstSharedPtr& localAddress() const override { return local_address_; }
void setLocalAddress(const Address::InstanceConstSharedPtr& local_address) override {
local_address_ = local_address;
}
ListenSocket
ListenSocket
是對監聽 fd 的封裝,繼承自 Socket
。主要操作自然就是 bind()
。bind 調用自地址類的 bind() 函數(看上面的 address)。
void ListenSocketImpl::doBind() {
// 地址和handle 繼承自socket。調用地址類的 bind。
const Api::SysCallIntResult result = local_address_->bind(io_handle_->fd());
if (result.rc_ == -1) {
close();
throw SocketBindException(
fmt::format("cannot bind '{}': {}", local_address_->asString(), strerror(result.errno_)),
result.errno_);
}
if (local_address_->type() == Address::Type::Ip && local_address_->ip()->port() == 0) {
// If the port we bind is zero, then the OS will pick a free port for us (assuming there are
// any), and we need to find out the port number that the OS picked.
local_address_ = Address::addressFromFd(io_handle_->fd());
}
ConnectionSocket
ConnectionSocket
是對連接 fd 的封裝,除了 Socket
的基本操作外,還增加對遠程地址和協議的設置。
//設置和獲取遠程地址
const Address::InstanceConstSharedPtr& remoteAddress() const override { return remote_address_; }
void setRemoteAddress(const Address::InstanceConstSharedPtr& remote_address) override {
remote_address_ = remote_address;
}
//協議相關
void setDetectedTransportProtocol(absl::string_view protocol) override {
transport_protocol_ = std::string(protocol);
}
absl::string_view detectedTransportProtocol() const override { return transport_protocol_; }
TransportSocket
TransportSocket
是一個實際讀/寫的傳輸套接字。它可以對數據進行一些轉換(比如TLS,TCP代理等)。 TransportSocket
提供了多個接口。
failureReason()
返回最后的一個錯誤,沒錯誤返回空值。
canFlushClose()
socket 是否能刷新和關閉。
closeSocket()
關閉 socket。
doRead()
讀取數據。
doWrite()
寫數據。
onConnected()
transport 連接時調用此函數。
Ssl::ConnectionInfo* ssl()
Ssl連接數據。
listen
listen
是對監聽操作相關的類,分為 TcpListen
和 UdpListen
。 Listen 抽象類只提供兩個接口 disable
和 enable
。 disable
關閉接受新連接,enable
開啟接受新連接。
ListenerImpl
實現那兩接口的同時,由於它是 TCP 的監聽必然就有 listen 和 accept 操作。在構造函數時,調用 setupServerSocket 創造 listen,啟用回調
void ListenerImpl::
setupServerSocket(Event::DispatcherImpl& dispatcher, Socket& socket) {
//創建監聽,完成后回調 listenCallback
listener_.reset(
evconnlistener_new(&dispatcher.base(), listenCallback, this, 0, -1, socket.ioHandle().fd()));
... ...
//失敗回調errorCallback
evconnlistener_set_error_cb(listener_.get(), errorCallback);
}
監聽完成后,調用 listenCallback。listenCallback 用回調函數調用 onAccept 接收連接。
void ListenerImpl::listenCallback(evconnlistener*, evutil_socket_t fd, sockaddr* remote_addr, int remote_addr_len, void* arg) {
ListenerImpl* listener = static_cast<ListenerImpl*>(arg);
IoHandlePtr io_handle = std::make_unique<IoSocketHandleImpl>(fd);
// 獲取本地地址
const Address::InstanceConstSharedPtr& local_address =
listener->local_address_ ? listener->local_address_
: listener->getLocalAddress(io_handle->fd());
// 獲取遠程地址
const Address::InstanceConstSharedPtr& remote_address =
(remote_addr->sa_family == AF_UNIX)
? Address::peerAddressFromFd(io_handle->fd())
: Address::addressFromSockAddr(*reinterpret_cast<const sockaddr_storage*>(remote_addr),
remote_addr_len,
local_address->ip()->version() == Address::IpVersion::v6);
//調用 onAccept,
listener->cb_.onAccept(
std::make_unique<AcceptedSocketImpl>(std::move(io_handle), local_address, remote_address),
listener->hand_off_restored_destination_connections_);
}
connection
connection
是連接相關的操作,客戶端和服務端的連接都屬於這個類。 Connection
是針對原始連接的一個抽象,繼承自 DeferredDeletable
和 FilterManager
。關於 DeferredDeletable
延遲析構請看 Envoy 源碼分析--event,FilterManager
以后討論。
ConnectionImpl
ConnectionImpl
是 Connection
,BufferSource
和 TransportSocketCallbacks
三個抽象類的實現類。Connection
是連接操作相關的類,BufferSource
是獲得 StreamBuffer 的抽象類(包括讀和寫),TransportSocketCallbacks
是傳輸套接字實例與連接進行通信的回調。
每個 ConnectionImpl
實例都有一個唯一的全局ID。在構造時賦值。
std::atomic<uint64_t> ConnectionImpl::next_global_id_;
ConnectionImpl::ConnectionImpl(Event::Dispatcher& dispatcher, ConnectionSocketPtr&& socket, TransportSocketPtr&& transport_socket, bool connected) : id_(next_global_id_++) {
}
ConnectionImpl
事件由 dispatcher_
創建。在構造函數時創建事件。
Event 使用邊緣觸發,減少內核通知,提高效率(水平觸發和邊緣觸發區別大家自己查閱相關文檔)。同時寫入讀寫事件。當有讀寫事件時,會觸發回調 onFileEvent
。
ConnectionImpl::ConnectionImpl(Event::Dispatcher& dispatcher, ConnectionSocketPtr&& socket,TransportSocketPtr&& transport_socket, bool connected) {
... ...
file_event_ = dispatcher_.createFileEvent(
ioHandle().fd(), [this](uint32_t events) -> void { onFileEvent(events); },
Event::FileTriggerType::Edge, Event::FileReadyType::Read | Event::FileReadyType::Write);
... ...
}
onFileEvent
在收到事件后,對不同的事件進行不同的處理。
void ConnectionImpl::onFileEvent(uint32_t events) {
... ...
// 寫事件
if (events & Event::FileReadyType::Write) {
onWriteReady();
}
// 讀事件
if (ioHandle().isOpen() && (events & Event::FileReadyType::Read)) {
onReadReady();
}
}
對於讀事件,在連接調用 readDisable
后,如果是 enable 會觸發讀事件。
void ConnectionImpl::readDisable(bool disable) {
... ...
read_enabled_ = true;
file_event_->setEnabled(Event::FileReadyType::Read | Event::FileReadyType::Write);
if (read_buffer_.length() > 0) {
file_event_->activate(Event::FileReadyType::Read);
}
}
讀事件調用 onReadReady
,onReadReady
先從 buffer中讀取數據,同時更新統計數據。對返回的結果進行分析,已關閉直接關閉。正常讀到數據,判斷是否有數據,有數據會調用 onRead
, onRead
內會調用 ReadFilter 進行下一步處理(L3/4過濾下次分析)。
void ConnectionImpl::onReadReady() {
... ...
IoResult result = transport_socket_->doRead(read_buffer_);
uint64_t new_buffer_size = read_buffer_.length();
updateReadBufferStats(result.bytes_processed_, new_buffer_size);
if ((!enable_half_close_ && result.end_stream_read_)) {
result.end_stream_read_ = false;
result.action_ = PostIoAction::Close;
}
read_end_stream_ |= result.end_stream_read_;
//有讀到數據
if (result.bytes_processed_ != 0 || result.end_stream_read_)
onRead(new_buffer_size);
}
// 關閉連接
if (result.action_ == PostIoAction::Close || bothSidesHalfClosed()) {
ENVOY_CONN_LOG(debug, "remote close", *this);
closeSocket(ConnectionEvent::RemoteClose);
}
}
對於寫事件,在連接寫入數據時,會將數據先進行過濾,然后寫入寫緩沖。之后調用寫事件觸發 onFileEvent
void ConnectionImpl::write(Buffer::Instance& data, bool end_stream) {
... ...
// WriteFilter過濾
current_write_buffer_ = &data;
current_write_end_stream_ = end_stream;
FilterStatus status = filter_manager_.onWrite();
current_write_buffer_ = nullptr;
if (FilterStatus::StopIteration == status) {
return;
}
write_end_stream_ = end_stream;
if (data.length() > 0 || end_stream) {
// 寫入緩沖
write_buffer_->move(data);
if (!connecting_) {
//觸發寫事件
file_event_->activate(Event::FileReadyType::Write);
}
}
}
在寫入事件后會調用 onWriteReady
。 onWriteReady
先判斷是否已連接,未連接會調用 connect
連接事件。連接成功后發送數據並統計信息,連接失敗關閉 socket。
void ConnectionImpl::onWriteReady() {
... ...
if (connecting_) {
... ...
if (error == 0) {
connecting_ = false;
//socket 未連接,調用connect。
transport_socket_->onConnected();
... ...
// 發送數據
IoResult result = transport_socket_->doWrite(*write_buffer_, write_end_stream_);
uint64_t new_buffer_size = write_buffer_->length();
//更新統計信息
updateWriteBufferStats(result.bytes_processed_, new_buffer_size);
... ...
}
ClientConnectionImpl
ClientConnectionImpl
是客戶端的連接,其繼承自 ConnectionImpl
和 ClientConnection
。ClientConnectionImpl
只是在 Connection
的基礎上只增加了一個 connect 的接口。
connect 函數內最主要做的就是調用 connect() 連接。
void ClientConnectionImpl::connect() {
// 連接服務器
const Api::SysCallIntResult result = socket_->remoteAddress()->connect(ioHandle().fd());
if (result.rc_ == 0) {
// write will become ready.
ASSERT(connecting_);
} else {
... ...
}