skynet集群 --- cluster 模式


  skynet本身解決的核心問題是充分利用同一台機器的多核的處理能力。雲風在描述集群時,強調說skynet只提供了構建集群的組件。那是因為不是所有項目遇到的問題都能夠用統一的解決方案的。還提出任何企圖抹平服務運行位置差異的設計都需要慎重考慮,很可能存在設計問題,因為集群協作不與單機多服務工作,集群中可能對方的服務並未啟動,而單機工作中,可以認為不會只有一部分出錯(即不會說當前功能正在運行,但是本機目標服務未啟動的情況)。

  skynet自帶兩種集群基礎架構,我們接下來要拜讀的是cluster 方式。

  相對於master/slave 模式,cluster模式有較大的彈性。它不需要配置harbor_id,harbor_id需要填0,那么就意味着,集群節點不限制於256個。

  跟master/slave 不同,cluster的配置需要在每台機子上(或者緩存到redis中,每台機子獲取到的配置表需要一致)配置,需要新建一份文件,如skynet示例,又或者在cluster.reload時傳入一個table配置

1 __nowaiting = true    -- If you turn this flag off, cluster.call would block when node name is absent
2 
3 db = "127.0.0.1:2528"
4 db2 = "127.0.0.1:2529"
1 -- cluster1.lua
2 skynet.start(function()
3     cluster.reload {
4         db = "127.0.0.1:2528",
5         db2 = "127.0.0.1:2529",
6     }
7         ...
8 end)

  接下來我們主要看幾個問題:集群的啟動過程,cluster.send()與cluster.call()的執行過程。

cluster集群的啟動過程

  當我們需要使用cluster模式時,我們必須要在一個地方進行 require "cluster",可以在main里,也可以在項目中的負載均衡里等等地方。當require "cluster"時,會執行如下代碼:

1 -- cluster.lua
2 skynet.init(function()
3     clusterd = skynet.uniqueservice("clusterd")
4 end)

  在init中,會啟動一個 clusterd的服務,用來對集群的維護,其實cluster.lua只是對集群操作的一個淺封裝。

  在require啟動完clusterd服務后,需要進行 cluster.reload({})的操作,對集群中的機子進行初始化,並且啟動當前節點對配置表中的機子的 clustersender 發送服務,也就是說當前節點會有n個集群節點的發送服務(有多少個集群節點,就有多少個服務),當reload完成后,需要對各節點的監聽操作(cluster.open "name"),這里cluster.open的主要操作是生成一個對應目標機子的gateserver並監聽端口,如果有消息觸發gateserver,則表示對應的監聽機子有消息過來,如果是對方連接過來,則會生成一個對應的clusteragent(接收消息服務):

 1 -- cluster1.lua
 2 skynet.start(function()
 3     cluster.reload {
 4         db = "127.0.0.1:2528",
 5         db2 = "127.0.0.1:2529",
 6     }
 7         ... 
 8     cluster.open "db"
 9     cluster.open "db2"
10         ...
11 end)

  首先看下對集群節點發送消息的服務的創建與其功能:

 1 -- clustersender.lua
 2 skynet.start(function()
 3     channel = sc.channel {
 4             host = init_host,
 5             port = tonumber(init_port),
 6             response = read_response,
 7             nodelay = true,
 8         }
 9     skynet.dispatch("lua", function(session , source, cmd, ...)
10         local f = assert(command[cmd])
11         f(...)
12     end)
13 end)
View Code

  注意,此時只是創建了clustersender的服務,並沒有與對應的機子進行open操作,實際上與對應節點建立連接是在對節點進行通訊的時候才會去判斷是否已經連接成功,如果沒有連接則進行連接,這里每次通訊都會去判斷是因為連接是tcp的,可能會中斷。在創建服務時,可以看到對socketchannel 的對象進行了初始化,host與port是必須要初始化的,這里的response進行賦值是為了告訴channel是使用哪種通信邏輯,具體的socketchannel支持的通信邏輯,可查看官方wiki:SocketChannel · cloudwu/skynet Wiki · GitHub

  同理,對節點進行接收消息的 clusteragent 服務的創建大同小異。

  現在,對每個節點都創建了 clustersender, clusteragent 兩個服務后,我們看下cluster.call的執行過程是怎樣的。

cluster.call 的執行過程

  當執行cluster.call時,首先先訪問當前緩存是否有對應節點的發送服務信息,如果沒有先將此次消息插入到節點的消息隊列中,然后會跟clusterd獲取對應clustersender,如果對應服務未啟動,那么會觸發連接去新建一個tcp連接的服務 ,如果創建失敗則會將所有消息丟棄掉,如果服務創建完成,則會對所有消息進行依次執行,如果本身已經有緩存信息,則直接給發送服務發送 req 命令:

 1 -- cluster.lua
 2 function cluster.call(node, address, ...)
 3     -- skynet.pack(...) will free by cluster.core.packrequest
 4     local s = sender[node]
 5     if not s then
 6         local task = skynet.packstring(address, ...)
 7         return skynet.call(get_sender(node), "lua", "req", repack(skynet.unpack(task)))
 8     end
 9     return skynet.call(s, "lua", "req", address, skynet.pack(...))
10 end
View Code

  clustersender 發送消息服務接收req命令並執行操作,首先會對消息進行編碼,然后才執行發送操作:

 1 -- clustersender.lua
 2 local function send_request(addr, msg, sz)
 3     -- msg is a local pointer, cluster.packrequest will free it
 4     local current_session = session
 5     local request, new_session, padding = cluster.packrequest(addr, session, msg, sz)        -- 對消息進行打包,padding是區分消息是否為長消息,大於32k為長消息
 6 ...
 7     return channel:request(request, current_session, padding)                -- 對消息發送
 8 end
 9 
10 function command.req(...)
11     local ok, msg = pcall(send_request, ...)
12 ...
13 end
View Code

 

  chanel:request 函數,主要是對消息的發送,然后區分使用的模式是否為一問一答,還是不要求每一個請求都有一個回應進行,但是兩種模式都會觸發等待消息的情況:

 1 -- socketsender.lua
 2 local function push_response(self, response, co)
 3     if self.__response then
 4         -- response is session
 5         self.__thread[response] = co
 6     else
 7         -- response is a function, push it to __request
 8         table.insert(self.__request, response)
 9         table.insert(self.__thread, co)
10         if self.__wait_response then
11             skynet.wakeup(self.__wait_response)
12             self.__wait_response = nil
13         end
14     end
15 end
16 
17 local function wait_for_response(self, response)
18     local co = coroutine.running()
19     push_response(self, response, co)
20     skynet.wait(co)
21     ...
22 end
23 
24 function channel:request(request, response, padding)
25     assert(block_connect(self, true))    -- connect once
26     local fd = self.__sock[1]
27 ...
28     return wait_for_response(self, response)
29 end
View Code

  執行channel:request 會掛起當前協程等待消息(而channel在connect時就會fork一個協程進行消息的分發。),對應節點會在clusteragent服務接收到消息並且執行,然后將消息原路返回給原節點。

  節點收到消息回應,如果是dispatch_by_session則會根據session獲取到對應的協程然后喚起,使得之前的wait_for_response協程進行消息並且返回消息給對應的調用者。

 1 -- socketchannel.lua
 2 local function connect_once(self)
 3 ...
 4 self.__dispatch_thread = skynet.fork(function()
 5             pcall(dispatch_function(self), self)
 6             -- clear dispatch_thread
 7             self.__dispatch_thread = nil
 8         end)
 9 ...
10 end
View Code

  以上則是cluster.call的調用過程。

cluster.send 的執行過程

  相對於cluster.call而言,send的主要區別則是在channel:request時會直接返回函數,不會掛起協程進行等待。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM