轉載請聲明出處哦~,本篇文章發布於luozhiyun的博客:https://www.luozhiyun.com
源碼版本是1.19
這一篇是講service,但是基礎使用以及基本概念由於官方實在是寫的比較完整了,我沒有必要復述一遍,所以還不太清楚的小伙伴們可以去看官方的文檔:https://kubernetes.io/docs/concepts/services-networking/service/。
IPVS 概述
在 Kubernetes 集群中,每個 Node 運行一個 kube-proxy
進程。kube-proxy
負責為 Service 實現了一種 VIP(虛擬 IP)的形式。
從官方文檔介紹來看:
從 Kubernetes v1.0 開始,您已經可以使用 userspace 代理模式。 Kubernetes v1.1 添加了 iptables 模式代理,在 Kubernetes v1.2 中,kube-proxy 的 iptables 模式成為默認設置。 Kubernetes v1.8 添加了 ipvs 代理模式。
現在我們看的源碼是基於1.19,所以現在默認是ipvs代理模式。
LVS是國內章文嵩博士開發並貢獻給社區的,主要由ipvs和ipvsadm組成。
ipvs是工作在內核態的4層負載均衡,基於內核底層netfilter實現,netfilter主要通過各個鏈的鈎子實現包處理和轉發。ipvs由ipvsadm提供簡單的CLI接口進行ipvs配置。由於ipvs工作在內核態,只處理四層協議,因此只能基於路由或者NAT進行數據轉發,可以把ipvs當作一個特殊的路由器網關,這個網關可以根據一定的算法自動選擇下一跳。
IPVS vs IPTABLES
- iptables 使用鏈表,ipvs 使用哈希表;
- iptables 只支持隨機、輪詢兩種負載均衡算法而 ipvs 支持的多達 8 種;
- ipvs 還支持 realserver 運行狀況檢查、連接重試、端口映射、會話保持等功能。
IPVS用法
IPVS可以通過ipvsadm 命令進行配置,如-L列舉,-A添加,-D刪除。
如下命令創建一個service實例172.17.0.1:32016
,-t
指定監聽的為TCP
端口,-s
指定算法為輪詢算法rr(Round Robin),ipvs支持簡單輪詢(rr)、加權輪詢(wrr)、最少連接(lc)、源地址或者目標地址散列(sh、dh)等10種調度算法。
ipvsadm -A -t 172.17.0.1:32016 -s rr
在添加調度算法的時候還需要用-r指定server地址,-w指定權值,-m指定轉發模式,-m設置masquerading表示NAT模式(-g為gatewaying
,即直連路由模式),如下所示:
ipvsadm -a -t 172.17.0.1:32016 -r 10.244.1.2:8080 -m -w 1
ipvsadm -a -t 172.17.0.1:32016 -r 10.244.1.3:8080 -m -w 1
ipvsadm -a -t 172.17.0.1:32016 -r 10.244.3.2:8080 -m -w 1
Service ClusterIP原理
不清楚iptables調用鏈的同學可以先看看:https://www.zsythink.net/archives/1199了解一下。
我們這里使用上一篇HPA的一個例子:
apiVersion: apps/v1
kind: Deployment
metadata:
name: hpatest
spec:
replicas: 1
selector:
matchLabels:
app: hpatest
template:
metadata:
labels:
app: hpatest
spec:
containers:
- name: hpatest
image: nginx
imagePullPolicy: IfNotPresent
command: ["/bin/sh"]
args: ["-c","/usr/sbin/nginx; while true;do echo `hostname -I` > /usr/share/nginx/html/index.html; sleep 120;done"]
ports:
- containerPort: 80
resources:
requests:
cpu: 1m
memory: 100Mi
limits:
cpu: 3m
memory: 400Mi
---
apiVersion: v1
kind: Service
metadata:
name: hpatest-svc
spec:
selector:
app: hpatest
ports:
- port: 80
targetPort: 80
protocol: TCP
創建好之后,看看svc:
# kubectl get svc
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
hpatest-svc ClusterIP 10.68.196.212 <none> 80/TCP 2m44s
然后我們查看ipvs配置情況:
# ipvsadm -S -n | grep 10.68.196.212
-A -t 10.68.196.212:80 -s rr
-a -t 10.68.196.212:80 -r 172.20.0.251:80 -m -w 1
-S表示輸出所保存的規則,-n表示以數字的形式輸出ip和端口。可以看到ipvs的LB IP為ClusterIP,算法為rr,RS為Pod的IP。使用的模式為NAT模式。
當我們創建Service之后,kube-proxy 首先會在宿主機上創建一個虛擬網卡(叫作:kube-ipvs0),並為它分配 Service VIP 作為 IP 地址,如下所示:
# ip addr show kube-ipvs0
7: kube-ipvs0: <BROADCAST,NOARP> mtu 1500 qdisc noop state DOWN group default
link/ether 12:bb:85:91:96:4d brd ff:ff:ff:ff:ff:ff
...
inet 10.68.196.212/32 brd 10.68.196.212 scope global kube-ipvs0
valid_lft forever preferred_lft forever
下面看看ClusterIP如何傳遞的。使用命令:iptables -t nat -nvL可以看到由很多Chain,ClusterIP訪問方式為:
PREROUTING --> KUBE-SERVICES --> KUBE-CLUSTER-IP --> INPUT --> KUBE-FIREWALL --> POSTROUTING
當使用命令鏈接服務時:
curl 10.68.196.212:80
由於10.96.54.11就在本地,所以會以這個IP作為出口地址,即源IP和目標IP都是10.96.54.11,此時相當於:
10.68.196.212:xxxx -> 10.68.196.212:80
然后經過ipvs,ipvs會從RS ip列中選擇其中一個Pod ip作為目標IP:
10.68.196.212:xxxx -> 10.68.196.212:80
|
| IPVS
v
172.20.0.251:xxxx -> 172.20.0.251:80
查看OUTPUT規則:
# iptables-save
-A OUTPUT -m comment --comment "kubernetes service portals" -j KUBE-SERVICES
-A KUBE-SERVICES ! -s 172.20.0.0/16 -m comment --comment "Kubernetes service cluster ip + port for masquerade purpose" -m set --match-set KUBE-CLUSTER-IP dst,dst -j KUBE-MARK-MASQ
-A KUBE-MARK-MASQ -j MARK --set-xmark 0x4000/0x4000
如上規則的意思就是除了Pod以外訪問ClusterIP的包都打上0x4000/0x4000
。
到了POSTROUTING鏈:
-A POSTROUTING -m comment --comment "kubernetes postrouting rules" -j KUBE-POSTROUTING
-A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -m mark --mark 0x4000/0x4000 -j MASQUERADE
如上規則的意思就是只要匹配mark0x4000/0x4000
的包都做SNAT,由於172.20.0.251是從flannel.1出去的,因此源ip會改成flannel.1的ip 172.20.0.0
:
10.68.196.212:xxxx -> 10.68.196.212:80
|
| IPVS
v
10.68.196.212:xxxx -> 172.20.0.251:80
|
| MASQUERADE
v
172.20.0.0:xxxx -> 172.20.0.251:80
最后通過VXLAN隧道發到Pod的Node上,轉發給Pod的veth,回包通過路由到達源Node節點,源Node節點通過之前的MASQUERADE再把目標IP還原為172.20.0.251。
kube-proxy ipvs 源碼分析
初始化ipvs
文件位置:cmd/kube-proxy/app/server_others.go
kube-proxy啟動的時候會調用NewProxyServer初始化ipvs 代理:
func newProxyServer(
config *proxyconfigapi.KubeProxyConfiguration,
cleanupAndExit bool,
master string) (*ProxyServer, error) {
...
//獲取代理模式userspace iptables ipvs
proxyMode := getProxyMode(string(config.Mode), canUseIPVS, iptables.LinuxKernelCompatTester{})
...
//代理模式是iptables
if proxyMode == proxyModeIPTables {
...
// 代理模式是ipvs
} else if proxyMode == proxyModeIPVS {
klog.V(0).Info("Using ipvs Proxier.")
//判斷是夠啟用了 ipv6 雙棧
if utilfeature.DefaultFeatureGate.Enabled(features.IPv6DualStack) {
...
} else {
...
//初始化 ipvs 模式的 proxier
proxier, err = ipvs.NewProxier(
...
)
}
...
} else {
...
}
return &ProxyServer{
...
}, nil
}
NewProxyServer方法會根據proxyMode來選擇是IPVS還是IPTables,ipvs會調用ipvs.NewProxier方法來初始化一個proxier。
NewProxier
func NewProxier(... ) (*Proxier, error) {
...
//對於 SNAT iptables 規則生成 masquerade 標記
masqueradeValue := 1 << uint(masqueradeBit)
...
//設置默認調度算法 rr
if len(scheduler) == 0 {
klog.Warningf("IPVS scheduler not specified, use %s by default", DefaultScheduler)
scheduler = DefaultScheduler
}
// healthcheck服務器對象創建
serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder)
...
//初始化 proxier
proxier := &Proxier{
...
}
//初始化 ipset 規則
proxier.ipsetList = make(map[string]*IPSet)
for _, is := range ipsetInfo {
proxier.ipsetList[is.name] = NewIPSet(ipset, is.name, is.setType, isIPv6, is.comment)
}
burstSyncs := 2
klog.V(2).Infof("ipvs(%s) sync params: minSyncPeriod=%v, syncPeriod=%v, burstSyncs=%d",
ipt.Protocol(), minSyncPeriod, syncPeriod, burstSyncs)
//初始化 syncRunner
proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs)
//啟動 gracefuldeleteManager
proxier.gracefuldeleteManager.Run()
return proxier, nil
}
這個方法主要做了如下幾件事:
- 對於 SNAT iptables 規則生成 masquerade 標記;
- 設置默認調度算法 rr;
- healthcheck服務器對象創建;
- 初始化 proxier;
- 初始化 ipset 規則;
- 初始化 syncRunner;
- 啟動 gracefuldeleteManager;
這個方法在初始化syncRunner的時候設置了proxier.syncProxyRules方法作為一個參數構建了同步運行器syncRunner。
調用同步運行器
文件位置:cmd/kube-proxy/app/server.go
func (s *ProxyServer) Run() error {
...
//調用ipvs的SyncLoop方法
go s.Proxier.SyncLoop()
return <-errCh
}
kube-proxy在啟動的時候會初始化完ProxyServer 對象后,會調用runLoop方法,然后調用到ProxyServer的Run方法中,最后調用ipvs的SyncLoop方法。
func (proxier *Proxier) SyncLoop() {
...
proxier.syncRunner.Loop(wait.NeverStop) //執行NewBoundedFrequencyRunner對象Loop
}
func (bfr *BoundedFrequencyRunner) Loop(stop <-chan struct{}) {
bfr.timer.Reset(bfr.maxInterval)
for {
select {
case <-stop:
bfr.stop()
return
case <-bfr.timer.C(): //定時器方式執行
bfr.tryRun()
case <-bfr.run: //按需方式執行(發送運行指令信號)
bfr.tryRun()
}
}
}
func (bfr *BoundedFrequencyRunner) tryRun() {
bfr.mu.Lock()
defer bfr.mu.Unlock()
//限制條件允許運行func
if bfr.limiter.TryAccept() {
bfr.fn() // 重點執行部分,調用func,上下文來看此處就是
// 對syncProxyRules()的調用
bfr.lastRun = bfr.timer.Now() // 記錄運行時間
bfr.timer.Stop()
bfr.timer.Reset(bfr.maxInterval) // 重設下次運行時間
klog.V(3).Infof("%s: ran, next possible in %v, periodic in %v", bfr.name, bfr.minInterval, bfr.maxInterval)
return
}
//限制條件不允許運行,計算下次運行時間
elapsed := bfr.timer.Since(bfr.lastRun) // elapsed:上次運行時間到現在已過多久
nextPossible := bfr.minInterval - elapsed // nextPossible:下次運行至少差多久(最小周期)
nextScheduled := bfr.maxInterval - elapsed // nextScheduled:下次運行最遲差多久(最大周期)
klog.V(4).Infof("%s: %v since last run, possible in %v, scheduled in %v", bfr.name, elapsed, nextPossible, nextScheduled)
if nextPossible < nextScheduled {
bfr.timer.Stop()
bfr.timer.Reset(nextPossible)
klog.V(3).Infof("%s: throttled, scheduling run in %v", bfr.name, nextPossible)
}
}
SyncLoop方法會調用到proxier的syncRunner實例設置的syncProxyRules方法。
同步運行器
syncProxyRules方法比較長,所以這里就分開來一步步的講,跟好代碼節奏來就好了。
代碼位置:pkg/proxy/ipvs/proxier.go
//更新 service 與 endpoint變化信息
serviceUpdateResult := proxy.UpdateServiceMap(proxier.serviceMap, proxier.serviceChanges)
endpointUpdateResult := proxier.endpointsMap.Update(proxier.endpointsChanges)
staleServices := serviceUpdateResult.UDPStaleClusterIP
// 合並 service 列表
for _, svcPortName := range endpointUpdateResult.StaleServiceNames {
if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && conntrack.IsClearConntrackNeeded(svcInfo.Protocol()) {
klog.V(2).Infof("Stale %s service %v -> %s", strings.ToLower(string(svcInfo.Protocol())), svcPortName, svcInfo.ClusterIP().String())
staleServices.Insert(svcInfo.ClusterIP().String())
for _, extIP := range svcInfo.ExternalIPStrings() {
staleServices.Insert(extIP)
}
}
}
這里是同步與新更service和endpoints,然后 合並 service 列表。
//nat鏈
proxier.natChains.Reset()
//nat規則
proxier.natRules.Reset()
//filter鏈
proxier.filterChains.Reset()
//filter規則
proxier.filterRules.Reset()
// Write table headers.
writeLine(proxier.filterChains, "*filter")
writeLine(proxier.natChains, "*nat")
//創建kubernetes的表連接鏈數據
proxier.createAndLinkeKubeChain()
這里會重置鏈表規則,然后調用createAndLinkeKubeChain方法創建kubernetes的表連接鏈數據,下面我們看看createAndLinkeKubeChain方法:
func (proxier *Proxier) createAndLinkeKubeChain() {
//通過iptables-save獲取現有的filter和NAT表存在的鏈數據
existingFilterChains := proxier.getExistingChains(proxier.filterChainsData, utiliptables.TableFilter)
existingNATChains := proxier.getExistingChains(proxier.iptablesData, utiliptables.TableNAT)
// Make sure we keep stats for the top-level chains
//里面保存了NAT表鏈和Filter表鏈
// NAT表鏈: KUBE-SERVICES / KUBE-POSTROUTING / KUBE-FIREWALL
// KUBE-NODE-PORT / KUBE-LOAD-BALANCER / KUBE-MARK-MASQ
// Filter表鏈: KUBE-FORWARD
for _, ch := range iptablesChains {
//不存在則創建鏈,創建頂層鏈
if _, err := proxier.iptables.EnsureChain(ch.table, ch.chain); err != nil {
klog.Errorf("Failed to ensure that %s chain %s exists: %v", ch.table, ch.chain, err)
return
}
//nat表寫鏈
if ch.table == utiliptables.TableNAT {
if chain, ok := existingNATChains[ch.chain]; ok {
writeBytesLine(proxier.natChains, chain)
} else {
writeLine(proxier.natChains, utiliptables.MakeChainLine(kubePostroutingChain))
}
// filter表寫鏈
} else {
if chain, ok := existingFilterChains[KubeForwardChain]; ok {
writeBytesLine(proxier.filterChains, chain)
} else {
writeLine(proxier.filterChains, utiliptables.MakeChainLine(KubeForwardChain))
}
}
}
// 默認鏈下創建kubernete服務專用跳轉規則
// iptables -I OUTPUT -t nat --comment "kubernetes service portals" -j KUBE-SERVICES
// iptables -I PREROUTING -t nat --comment "kubernetes service portals" -j KUBE-SERVICES
// iptables -I POSTROUTING -t nat --comment "kubernetes postrouting rules" -j KUBE-POSTROUTING
// iptables -I FORWARD -t filter --comment "kubernetes forwarding rules" -j KUBE-FORWARD
for _, jc := range iptablesJumpChain {
args := []string{"-m", "comment", "--comment", jc.comment, "-j", string(jc.to)}
if _, err := proxier.iptables.EnsureRule(utiliptables.Prepend, jc.table, jc.from, args...); err != nil {
klog.Errorf("Failed to ensure that %s chain %s jumps to %s: %v", jc.table, jc.from, jc.to, err)
}
}
}
createAndLinkeKubeChain方法首先會獲取現存的filter和NAT表,然后再遍歷iptablesChains。
iptablesChains里面保存了NAT表鏈和Filter表鏈:NAT表鏈 KUBE-SERVICES / KUBE-POSTROUTING / KUBE-FIREWALL KUBE-NODE-PORT / KUBE-LOAD-BALANCER / KUBE-MARK-MASQ;Filter表鏈 KUBE-FORWARD;
然后再根據iptablesJumpChain創建跳轉規則。
下面回到syncProxyRules往下走。
// 創建 dummy interface kube-ipvs0
_, err = proxier.netlinkHandle.EnsureDummyDevice(DefaultDummyDevice)
if err != nil {
klog.Errorf("Failed to create dummy interface: %s, error: %v", DefaultDummyDevice, err)
return
}
// 創建默認的 ipset 規則,http://ipset.netfilter.org/
for _, set := range proxier.ipsetList {
if err := ensureIPSet(set); err != nil {
return
}
set.resetEntries()
}
設置默認Dummy接口,並確定ipsets規則已存在的集合,ipset相關可以看:http://ipset.netfilter.org/。
下面會遍歷proxier.serviceMap,對每一個服務創建 ipvs 規則,比較長,也分開說。
for svcName, svc := range proxier.serviceMap {
...
//基於此服務的有效endpoint列表,更新KUBE-LOOP-BACK的ipset集,以備后面生成相應iptables規則(SNAT偽裝地址)
for _, e := range proxier.endpointsMap[svcName] {
ep, ok := e.(*proxy.BaseEndpointInfo)
if !ok {
klog.Errorf("Failed to cast BaseEndpointInfo %q", e.String())
continue
}
if !ep.IsLocal {
continue
}
epIP := ep.IP()
epPort, err := ep.Port()
// Error parsing this endpoint has been logged. Skip to next endpoint.
if epIP == "" || err != nil {
continue
}
entry := &utilipset.Entry{
IP: epIP,
Port: epPort,
Protocol: protocol,
IP2: epIP,
SetType: utilipset.HashIPPortIP,
}
// 校驗KUBE-LOOP-BACK集合entry記錄項
if valid := proxier.ipsetList[kubeLoopBackIPSet].validateEntry(entry); !valid {
klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeLoopBackIPSet].Name))
continue
}
// 插入此entry記錄至active記錄隊列
proxier.ipsetList[kubeLoopBackIPSet].activeEntries.Insert(entry.String())
}
...
}
這一段是根據有效endpoint列表,更新KUBE-LOOP-BACK的ipset集,以備后面生成相應iptables規則(SNAT偽裝地址);
for svcName, svc := range proxier.serviceMap {
...
//構建ipset entry
entry := &utilipset.Entry{
IP: svcInfo.ClusterIP().String(),
Port: svcInfo.Port(),
Protocol: protocol,
SetType: utilipset.HashIPPort,
}
// add service Cluster IP:Port to kubeServiceAccess ip set for the purpose of solving hairpin.
// proxier.kubeServiceAccessSet.activeEntries.Insert(entry.String())
// 類型校驗ipset entry
if valid := proxier.ipsetList[kubeClusterIPSet].validateEntry(entry); !valid {
klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeClusterIPSet].Name))
continue
}
// 名為KUBE-CLUSTER-IP的ipset集插入entry,以備后面統一生成IPtables規則
proxier.ipsetList[kubeClusterIPSet].activeEntries.Insert(entry.String())
// ipvs call
// 構建ipvs虛擬服務器VS服務對象
serv := &utilipvs.VirtualServer{
Address: svcInfo.ClusterIP(),
Port: uint16(svcInfo.Port()),
Protocol: string(svcInfo.Protocol()),
Scheduler: proxier.ipvsScheduler,
}
// Set session affinity flag and timeout for IPVS service
// 設置IPVS服務的會話保持標志和超時時間
if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
serv.Flags |= utilipvs.FlagPersistent
serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds())
}
// We need to bind ClusterIP to dummy interface, so set `bindAddr` parameter to `true` in syncService()
// 將clusterIP綁定至dummy虛擬接口上,syncService()處理中需置bindAddr地址為True
if err := proxier.syncService(svcNameString, serv, true, bindedAddresses); err == nil {
activeIPVSServices[serv.String()] = true
activeBindAddrs[serv.Address.String()] = true
// ExternalTrafficPolicy only works for NodePort and external LB traffic, does not affect ClusterIP
// So we still need clusterIP rules in onlyNodeLocalEndpoints mode.
//同步endpoints信息,IPVS為VS更新realServer
if err := proxier.syncEndpoint(svcName, false, serv); err != nil {
klog.Errorf("Failed to sync endpoint for service: %v, err: %v", serv, err)
}
} else {
klog.Errorf("Failed to sync service: %v, err: %v", serv, err)
}
...
}
ipset集KUBE-CLUSTER-IP更新,以備后面生成相應iptables規則。
for svcName, svc := range proxier.serviceMap {
...
//為 load-balancer類型創建 ipvs 規則
for _, ingress := range svcInfo.LoadBalancerIPStrings() {
if ingress != "" {
// 構建ipset entry
entry = &utilipset.Entry{
IP: ingress,
Port: svcInfo.Port(),
Protocol: protocol,
SetType: utilipset.HashIPPort,
}
if valid := proxier.ipsetList[kubeLoadBalancerSet].validateEntry(entry); !valid {
klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeLoadBalancerSet].Name))
continue
}
//KUBE-LOAD-BALANCER ipset集更新
proxier.ipsetList[kubeLoadBalancerSet].activeEntries.Insert(entry.String())
//服務指定externalTrafficPolicy=local時,KUBE-LOAD-BALANCER-LOCAL ipset集更新
if svcInfo.OnlyNodeLocalEndpoints() {
if valid := proxier.ipsetList[kubeLoadBalancerLocalSet].validateEntry(entry); !valid {
klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeLoadBalancerLocalSet].Name))
continue
}
proxier.ipsetList[kubeLoadBalancerLocalSet].activeEntries.Insert(entry.String())
}
// 服務的LoadBalancerSourceRanges被指定時,基於源IP保護的防火牆策略開啟,KUBE-LOAD-BALANCER-FW ipset集更新
if len(svcInfo.LoadBalancerSourceRanges()) != 0 {
if valid := proxier.ipsetList[kubeLoadbalancerFWSet].validateEntry(entry); !valid {
klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeLoadbalancerFWSet].Name))
continue
}
proxier.ipsetList[kubeLoadbalancerFWSet].activeEntries.Insert(entry.String())
allowFromNode := false
for _, src := range svcInfo.LoadBalancerSourceRanges() {
// ipset call
entry = &utilipset.Entry{
IP: ingress,
Port: svcInfo.Port(),
Protocol: protocol,
Net: src,
SetType: utilipset.HashIPPortNet,
}
// 枚舉所有源CIDR白名單列表,KUBE-LOAD-BALANCER-SOURCE-CIDR ipset集更新
//cidr:https://cloud.google.com/kubernetes-engine/docs/how-to/flexible-pod-cidr
if valid := proxier.ipsetList[kubeLoadBalancerSourceCIDRSet].validateEntry(entry); !valid {
klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeLoadBalancerSourceCIDRSet].Name))
continue
}
proxier.ipsetList[kubeLoadBalancerSourceCIDRSet].activeEntries.Insert(entry.String())
// ignore error because it has been validated
_, cidr, _ := net.ParseCIDR(src)
if cidr.Contains(proxier.nodeIP) {
allowFromNode = true
}
}
...
}
// ipvs call
// 構建ipvs 虛擬主機對象
serv := &utilipvs.VirtualServer{
Address: net.ParseIP(ingress),
Port: uint16(svcInfo.Port()),
Protocol: string(svcInfo.Protocol()),
Scheduler: proxier.ipvsScheduler,
}
...
}
}
...
}
這里為 load-balancer類型創建 ipvs 規則,LoadBalancerSourceRanges和externalTrafficPolicy=local被指定時將對KUBE-LOAD-BALANCER-LOCAL、KUBE-LOAD-BALANCER-FW、KUBE-LOAD-BALANCER-SOURCE-CIDR、KUBE-LOAD-BALANCER-SOURCE-IP ipset集更新,以備后面生成相應iptables規則。
...
//同步 ipset 記錄,清理 conntrack
for _, set := range proxier.ipsetList {
set.syncIPSetEntries()
}
//創建 iptables 規則數據
proxier.writeIptablesRules()
// 合並iptables規則
proxier.iptablesData.Reset()
proxier.iptablesData.Write(proxier.natChains.Bytes())
proxier.iptablesData.Write(proxier.natRules.Bytes())
proxier.iptablesData.Write(proxier.filterChains.Bytes())
proxier.iptablesData.Write(proxier.filterRules.Bytes())
klog.V(5).Infof("Restoring iptables rules: %s", proxier.iptablesData.Bytes())
//基於iptables格式化規則數據,使用iptables-restore刷新iptables規則
err = proxier.iptables.RestoreAll(proxier.iptablesData.Bytes(), utiliptables.NoFlushTables, utiliptables.RestoreCounters)
...
最后這里會刷新iptables 規則,然后創建 iptables 規則數據,將合並iptables規則,iptables-restore刷新iptables規則。
總結
這一篇沒有怎么講service是怎么運行的,怎么使用的,而是選擇講了kube-proxy的ipvs代理是怎么做的,以及在開頭講了ipvs與iptables區別與關系,看不懂的同學需要自己去補充一下iptables相關的知識,文中的 ipvs 的知識我也是現學的,如果有講解不好的地方歡迎指出。
Reference
https://kubernetes.io/docs/tasks/administer-cluster/dns-debugging-resolution/
https://kubernetes.io/docs/tasks/debug-application-cluster/debug-service/
https://kubernetes.io/docs/concepts/services-networking/service/
https://cloud.google.com/kubernetes-engine/docs/how-to/flexible-pod-cidr
https://github.com/kubernetes/kubernetes/tree/master/pkg/proxy/ipvs
https://zh.wikipedia.org/zh-cn/網絡地址轉換