最近手上有點時間,打算繼續了解下go-micro的發布訂閱(消息),看了micro的examples后,有個疑問,go-micro在提供發布訂閱的插件Broker(以及幾種實現)的同時,go-micro本身還實現了Publish(Client)以及Subscribe(Server)功能,於是翻了下源碼,做個記錄。
Broker
Broker是go-micro定義的一個異步消息的接口,同時使用插件的形式,可隨意在不同的實現(http,nats,rabbitmq)之間無縫切換。
// Broker is an interface used for asynchronous messaging.
type Broker interface {
Init(...Option) error
Options() Options
Address() string
Connect() error
Disconnect() error
Publish(topic string, m *Message, opts ...PublishOption) error
Subscribe(topic string, h Handler, opts ...SubscribeOption) (Subscriber, error)
String() string
}
從上面的接口可以看出,使用Broker來完成發布訂閱只需要以下幾步:
- 初始化一個Broker(
Init
) - 連接Broker(
Connect
) - 使用准備好的Broker發布/訂閱(
Publish/Subscribe
) - 關閉Broker(
Disconnect
)
go-micro中默認的broker實現
go-micro默認有基於http的Broker實現,可以直接使用。micro有給出具體的example,具體看下source code中的實現。
下面是go-micro中broer.go中對DefaultBroker的相關code:
var (
DefaultBroker Broker = NewBroker()
)
func Init(opts ...Option) error {
return DefaultBroker.Init(opts...)
}
func Connect() error {
return DefaultBroker.Connect()
}
func Disconnect() error {
return DefaultBroker.Disconnect()
}
func Publish(topic string, msg *Message, opts ...PublishOption) error {
return DefaultBroker.Publish(topic, msg, opts...)
}
func Subscribe(topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) {
return DefaultBroker.Subscribe(topic, handler, opts...)
}
func String() string {
return DefaultBroker.String()
}
可以看到都是基於NewBroker()
返回的broker實例來做的公用方法封裝,我們進一步看看。
// NewBroker returns a new http broker
func NewBroker(opts ...Option) Broker {
return newHttpBroker(opts...)
}
這里是直接返回了一個http實現的broker(和上面提到的默認是基於http實現的匹配),繼續跟newHttpBroker
。
這里這列出部分code,詳細的可直接參考go-micro下的http.go
h := &httpBroker{
id: uuid.New().String(),
address: addr,
opts: options,
r: options.Registry,
c: &http.Client{Transport: newTransport(options.TLSConfig)},
subscribers: make(map[string][]*httpSubscriber),
exit: make(chan chan error),
mux: http.NewServeMux(),
inbox: make(map[string][][]byte),
}
這里的核心是new了一個httpBroker,做為Broker接口的實現,在具體的實現就不在這里說了,下來我們看看上面提到接口的實現。
Init
func (h *httpBroker) Init(opts ...Option) error {
h.RLock()
if h.running {
h.RUnlock()
return errors.New("cannot init while connected")
}
h.RUnlock()
h.Lock()
defer h.Unlock()
for _, o := range opts {
o(&h.opts)
}
if len(h.opts.Addrs) > 0 && len(h.opts.Addrs[0]) > 0 {
h.address = h.opts.Addrs[0]
}
if len(h.id) == 0 {
h.id = "go.micro.http.broker-" + uuid.New().String()
}
// get registry
reg := h.opts.Registry
if reg == nil {
reg = registry.DefaultRegistry
}
// get cache
if rc, ok := h.r.(cache.Cache); ok {
rc.Stop()
}
// set registry
h.r = cache.New(reg)
// reconfigure tls config
if c := h.opts.TLSConfig; c != nil {
h.c = &http.Client{
Transport: newTransport(c),
}
}
return nil
}
從上面的code中可以看到,Init的作用就是初始化各種配置,如果Option參數有提供,就是用參數提供的,如果沒有就在這里設置一個,這里有2個點我們需要額外關注下:
-
Registry
Registry是注冊中心,如果option中沒有提供registry,就會使用go-micro默認實現的(msdn)
-
TLSConfig
TLSConfig是針對https的配置,默認是http
Connect
func (h *httpBroker) Connect() error {
h.RLock()
if h.running {
h.RUnlock()
return nil
}
h.RUnlock()
h.Lock()
defer h.Unlock()
var l net.Listener
var err error
if h.opts.Secure || h.opts.TLSConfig != nil {
config := h.opts.TLSConfig
fn := func(addr string) (net.Listener, error) {
if config == nil {
hosts := []string{addr}
// check if its a valid host:port
if host, _, err := net.SplitHostPort(addr); err == nil {
if len(host) == 0 {
hosts = maddr.IPs()
} else {
hosts = []string{host}
}
}
// generate a certificate
cert, err := mls.Certificate(hosts...)
if err != nil {
return nil, err
}
config = &tls.Config{Certificates: []tls.Certificate{cert}}
}
return tls.Listen("tcp", addr, config)
}
l, err = mnet.Listen(h.address, fn)
} else {
fn := func(addr string) (net.Listener, error) {
return net.Listen("tcp", addr)
}
l, err = mnet.Listen(h.address, fn)
}
if err != nil {
return err
}
addr := h.address
h.address = l.Addr().String()
go http.Serve(l, h.mux)
go func() {
h.run(l)
h.Lock()
h.opts.Addrs = []string{addr}
h.address = addr
h.Unlock()
}()
// get registry
reg := h.opts.Registry
if reg == nil {
reg = registry.DefaultRegistry
}
// set cache
h.r = cache.New(reg)
// set running
h.running = true
return nil
}
Connect方法的主要作用是創建一個Htto Server用來接收Publish時發送的消息
Subscribe
func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) {
var err error
var host, port string
options := NewSubscribeOptions(opts...)
// parse address for host, port
host, port, err = net.SplitHostPort(h.Address())
if err != nil {
return nil, err
}
addr, err := maddr.Extract(host)
if err != nil {
return nil, err
}
var secure bool
if h.opts.Secure || h.opts.TLSConfig != nil {
secure = true
}
// register service
node := ®istry.Node{
Id: topic + "-" + h.id,
Address: mnet.HostPort(addr, port),
Metadata: map[string]string{
"secure": fmt.Sprintf("%t", secure),
"broker": "http",
"topic": topic,
},
}
// check for queue group or broadcast queue
version := options.Queue
if len(version) == 0 {
version = broadcastVersion
}
service := ®istry.Service{
Name: serviceName,
Version: version,
Nodes: []*registry.Node{node},
}
// generate subscriber
subscriber := &httpSubscriber{
opts: options,
hb: h,
id: node.Id,
topic: topic,
fn: handler,
svc: service,
}
// subscribe now
if err := h.subscribe(subscriber); err != nil {
return nil, err
}
// return the subscriber
return subscriber, nil
}
這部分代碼的核心功能就是創建用於訂閱的server,一個topic創建一個server並收集(注冊)到httpSubscriber的svc列表中(發布消息時使用topic在subscriber的svc列表中查詢到對應的server給他發送消息)。
Publish
func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption) error {
// create the message first
m := &Message{
Header: make(map[string]string),
Body: msg.Body,
}
for k, v := range msg.Header {
m.Header[k] = v
}
m.Header["Micro-Topic"] = topic
// encode the message
b, err := h.opts.Codec.Marshal(m)
if err != nil {
return err
}
// save the message
h.saveMessage(topic, b)
// now attempt to get the service
h.RLock()
s, err := h.r.GetService(serviceName)
if err != nil {
h.RUnlock()
return err
}
h.RUnlock()
pub := func(node *registry.Node, t string, b []byte) error {
scheme := "http"
// check if secure is added in metadata
if node.Metadata["secure"] == "true" {
scheme = "https"
}
vals := url.Values{}
vals.Add("id", node.Id)
uri := fmt.Sprintf("%s://%s%s?%s", scheme, node.Address, DefaultPath, vals.Encode ())
r, err := h.c.Post(uri, "application/json", bytes.NewReader(b))
if err != nil {
return err
}
// discard response body
io.Copy(ioutil.Discard, r.Body)
r.Body.Close()
return nil
}
srv := func(s []*registry.Service, b []byte) {
for _, service := range s {
var nodes []*registry.Node
for _, node := range service.Nodes {
// only use nodes tagged with broker http
if node.Metadata["broker"] != "http" {
continue
}
// look for nodes for the topic
if node.Metadata["topic"] != topic {
continue
}
nodes = append(nodes, node)
}
// only process if we have nodes
if len(nodes) == 0 {
continue
}
switch service.Version {
// broadcast version means broadcast to all nodes
case broadcastVersion:
var success bool
// publish to all nodes
for _, node := range nodes {
// publish async
if err := pub(node, topic, b); err == nil {
success = true
}
}
// save if it failed to publish at least once
if !success {
h.saveMessage(topic, b)
}
default:
// select node to publish to
node := nodes[rand.Int()%len(nodes)]
// publish async to one node
if err := pub(node, topic, b); err != nil {
// if failed save it
h.saveMessage(topic, b)
}
}
}
}
// do the rest async
go func() {
// get a third of the backlog
messages := h.getMessage(topic, 8)
delay := (len(messages) > 1)
// publish all the messages
for _, msg := range messages {
// serialize here
srv(s, msg)
// sending a backlog of messages
if delay {
time.Sleep(time.Millisecond * 100)
}
}
}()
return nil
看過了上面的Subscribe
實現,這里的Publish
就比較簡單
- 創建消息體並存儲在inbox
- 根據topic以及broker的標簽(這里是固定http)來查找訂閱的server(在上面訂閱模塊創建的)
上面有可能會查找出多個node(訂閱server),所以里面還有一個版本的機制,如果指定了版本就會給所有的匹配節點發送(默認是隨機發送一個)
- 使用http post的方式(異步)把消息發送出去
Disconnect
func (h *httpBroker) Disconnect() error {
h.RLock()
if !h.running {
h.RUnlock()
return nil
}
h.RUnlock()
h.Lock()
defer h.Unlock()
// stop cache
rc, ok := h.r.(cache.Cache)
if ok {
rc.Stop()
}
// exit and return err
ch := make(chan error)
h.exit <- ch
err := <-ch
// set not running
h.running = false
return err
}
這部分功能很簡單,清空緩存並發送退出的消息,同時停止服務
以上就是go-micro中默認基於http的broker實現。
go-micro中對於broker的包裝
在看完broker的http默認實現后,我們對於broker有了一個大體了解,接下來我們在看下go-micro對於broker做的包裝部分,應該是為了簡化使用(確實只需要一步就可以)。
訂閱RegisterSubscriber
:
func main() {
// create a service
service := micro.NewService(
micro.Name("go.micro.srv.pubsub"),
)
// parse command line
service.Init()
// register subscriber
micro.RegisterSubscriber("example.topic.pubsub.1", service.Server(), new(Sub))
// register subscriber with queue, each message is delivered to a unique subscriber
micro.RegisterSubscriber("example.topic.pubsub.2", service.Server(), subEv, server.SubscriberQueue("queue.pubsub"))
if err := service.Run(); err != nil {
log.Fatal(err)
}
}
發布NewPublisher, Publish
:
func main() {
// create a service
service := micro.NewService(
micro.Name("go.micro.cli.pubsub"),
)
// parse command line
service.Init()
// create publisher
pub1 := micro.NewPublisher("example.topic.pubsub.1", service.Client())
pub2 := micro.NewPublisher("example.topic.pubsub.2", service.Client())
// pub to topic 1
go sendEv("example.topic.pubsub.1", pub1)
// pub to topic 2
go sendEv("example.topic.pubsub.2", pub2)
// block forever
select {}
}
以上只是代碼節選,具體使用方法可以參考example中的pubsub。
Subscriber
訂閱對比直接用Broker只需要一步RegisterSubscriber
,我們看看里面實現
//go-micro/micro.go
// RegisterSubscriber is syntactic sugar for registering a subscriber
func RegisterSubscriber(topic string, s server.Server, h interface{}, opts ...server.SubscriberOption) error {
return s.Subscribe(s.NewSubscriber(topic, h, opts...))
}
//go-micro/server/rpc_server.go
func (s *rpcServer) NewSubscriber(topic string, sb interface{}, opts ...SubscriberOption) Subscriber {
return s.router.NewSubscriber(topic, sb, opts...)
}
func (s *rpcServer) Subscribe(sb Subscriber) error {
s.Lock()
defer s.Unlock()
if err := s.router.Subscribe(sb); err != nil {
return err
}
s.subscribers[sb] = nil
return nil
}
//go-micro/server/rpc_router.go
// router represents an RPC router.
type router struct {
.......
subscribers map[string][]*subscriber
}
//go-micro/server/subscriber.go
type subscriber struct {
topic string
rcvr reflect.Value
typ reflect.Type
subscriber interface{}
handlers []*handler
endpoints []*registry.Endpoint
opts SubscriberOptions
}
上面的節選code可以看出,在默認server(rpcServer)中的router中定義了個map類型的變量subscribers
用來存儲訂閱的topic和對應處理的subscriber
,server在接收到消息后,只需要根據topic去map中找到subscriber,去處理即可。
subscriber中具體的處理,可以從定義中看出來,里面存儲對應路由和響應的handler(server本身的功能),有興趣可以在go-micro/server/subscriber.go看看具體代碼實現。
Publisher
發布是在go-micro的默認client實現(rpc_client)里面定義了一個默認的broker(上面有分析過的http實現)
//go-micro/micro.go
// Deprecated: NewPublisher returns a new Publisher
func NewPublisher(topic string, c client.Client) Event {
return NewEvent(topic, c)
}
// NewEvent creates a new event publisher
func NewEvent(topic string, c client.Client) Event {
if c == nil {
c = client.NewClient()
}
return &event{c, topic}
}
//go-micro/event.go
type event struct {
c client.Client
topic string
}
func (e *event) Publish(ctx context.Context, msg interface{}, opts ...client.PublishOption) error {
return e.c.Publish(ctx, e.c.NewMessage(e.topic, msg), opts...)
}
這里可以看到實際上是使用傳遞進來的client來初始化一個event,並用來發送消息,如果傳遞的是空,默認創建一個client(rpcClient
)。
總結
經過以上過程的追蹤,最終總結下來就幾點:
- broker定義了接口,micro提供的插件的形式可無縫替換實現
- go-micro提供了一個默認的broker實現,是基於http
- go-micro的基於默認的server、client以及brkoer包裝了一套更簡單的pub和sub方法