之前我們通過Watch機制實現了簡化版本的監視服務,這里我們基於這種機制實現一個發布訂閱模式,但是因為RPC缺乏流機制導致每次只能返回一個結果,在發布訂閱模式中,由調用者主動發起的發布行為類似於一個普通函數調用,而被動的訂閱者則類似gRPC客戶端單向流中的接收者。現在我們可以嘗試基於gRPC的流特性構造一個發布訂閱系統。
首先我們需要使用一個第三方模塊:go get github.com/docker/docker, 下面我們寫一個簡單的訂閱模式demo:
package main import ( "fmt" "github.com/docker/docker/pkg/pubsub" "strings" "time" ) func main(){ p := pubsub.NewPublisher(100*time.Microsecond, 10) golang := p.SubscribeTopic(func(v interface{}) bool { if key, ok := v.(string); ok { if strings.HasPrefix(key, "golang:") { return true } } return false }) docker := p.SubscribeTopic(func(v interface{}) bool { if key, ok := v.(string); ok { if strings.HasPrefix(key, "docker:") { return true } } return false }) go p.Publish("wang") go p.Publish("golang: https://golang.org") go p.Publish("docker: https://www.docker.com") time.Sleep(time.Second*2) go func() { fmt.Println("golang topic:", <-golang) }() go func() { fmt.Println("docker topic:",<-docker) }() time.Sleep(time.Second*3) fmt.Println("end") }
以上代碼運行后,會通過我們的訂閱過濾函數:p.SubscribeTopic過濾調我們不是訂閱的信息,最終打印出相關的結果。
gRPC發布訂閱實例
我們分別需要有一個proto文件定義服務端和客戶端的接口實現,里面定義了傳輸數據類型和實現方法,緊接着我們需要一個服務端,它用來支撐起整個服務給所有的客戶端訪問,再然后我們需要兩個客戶端,一個發布一個訂閱(先訂閱)。
proto/publish.proto文件:
syntax="proto3"; package proto; message StringPub{ string value =1; } service PubsubService { // 發布是rpc的普通方法 rpc Publish (StringPub) returns (StringPub); // 訂閱則是一個單向的流服務,服務端返回的數據可能很大 rpc Subscribe (StringPub) returns (stream StringPub); }
我們使用:protoc --go_out=plugins=grpc:. publish.proto 生成相應的go文件,重點分析:
// 客戶端接受體 type pubsubServiceClient struct { cc *grpc.ClientConn } // 客戶端調用它生成接受體 func NewPubsubServiceClient(cc *grpc.ClientConn) PubsubServiceClient { return &pubsubServiceClient{cc} } // 客戶端的方法實現 func (c *pubsubServiceClient) Publish(ctx context.Context, in *StringPub, opts ...grpc.CallOption) (*StringPub, error) { out := new(StringPub) err := c.cc.Invoke(ctx, "/proto.PubsubService/Publish", in, out, opts...) if err != nil { return nil, err } return out, nil } func (c *pubsubServiceClient) Subscribe(ctx context.Context, in *StringPub, opts ...grpc.CallOption) (PubsubService_SubscribeClient, error) { stream, err := c.cc.NewStream(ctx, &_PubsubService_serviceDesc.Streams[0], "/proto.PubsubService/Subscribe", opts...) if err != nil { return nil, err } x := &pubsubServiceSubscribeClient{stream} if err := x.ClientStream.SendMsg(in); err != nil { return nil, err } if err := x.ClientStream.CloseSend(); err != nil { return nil, err } return x, nil } // 返回用於發送接受的對象,類似socket type PubsubService_SubscribeClient interface { Recv() (*StringPub, error) grpc.ClientStream } type pubsubServiceSubscribeClient struct { grpc.ClientStream } ... func (x *pubsubServiceSubscribeClient) Recv() (*StringPub, error) { m := new(StringPub) if err := x.ClientStream.RecvMsg(m); err != nil { return nil, err } return m, nil } // PubsubServiceServer is the server API for PubsubService service. type PubsubServiceServer interface { // 發布是rpc的普通方法 Publish(context.Context, *StringPub) (*StringPub, error) // 訂閱則是一個單向的流服務,服務端返回的數據可能很大 Subscribe(*StringPub, PubsubService_SubscribeServer) error } func RegisterPubsubServiceServer(s *grpc.Server, srv PubsubServiceServer) { s.RegisterService(&_PubsubService_serviceDesc, srv) } type PubsubService_SubscribeServer interface { Send(*StringPub) error grpc.ServerStream }
接着我們可以實現一個發布的客戶端:pub_service.go:
package main import ( "context" "gRPC_demo/proto" "google.golang.org/grpc" "log" ) func main() { conn, err := grpc.Dial("localhost:1234", grpc.WithInsecure()) if err != nil { log.Fatal(err) } defer conn.Close() client := proto.NewPubsubServiceClient(conn) _, err = client.Publish(context.Background(), &proto.StringPub{Value: "golang: hello Wang"}) if err != nil { log.Fatal(err) } _, err = client.Publish(context.Background(), &proto.StringPub{Value: "docker: hello Wang"}) if err != nil { log.Fatal(err) } }
最后我們實現訂閱客戶端:sub_client.go:
package main import ( "context" "fmt" "gRPC_demo/proto" "google.golang.org/grpc" "io" "log" ) func main() { conn, err := grpc.Dial("localhost:1234", grpc.WithInsecure()) if err != nil { log.Fatal(err) } defer conn.Close() client := proto.NewPubsubServiceClient(conn) stream, err := client.Subscribe(context.Background(),&proto.StringPub{Value:"golang: "}) if err != nil { log.Fatal(err) } for { reply, err := stream.Recv() if err != nil { if err == io.EOF{ break } log.Fatal(err) } fmt.Println(reply.GetValue()) } }
到此發布訂閱功能就基本實現了,依次啟動服務端,訂閱客戶端,發布客戶端就可以查看結果了。
手寫發布訂閱
分別由服務端、客戶端代碼,以及一個調用文件。(此實例是摘抄於:http://www.itkeyword.com/doc/602627406483745x914/golang-pubsub)感覺寫的不錯,所以摘抄。
服務端代碼:service.go
package pubsub import ( "errors" "sync" ) type Client struct { Id int Ip string } type Server struct { Dict map[string]*Channel //map[Channel.Name]*Channel sync.RWMutex } func NewServer() *Server { s := &Server{} s.Dict = make(map[string]*Channel) //所有channel return s } //訂閱 func (srv *Server) Subscribe(client *Client, channelName string) { // 客戶是否在Channel的客戶列表中 srv.RLock() ch, found := srv.Dict[channelName] srv.RUnlock() if !found { ch = NewChannel(channelName) ch.AddClient(client) srv.Lock() srv.Dict[channelName] = ch srv.Unlock() } else { ch.AddClient(client) } } //取消訂閱 func (srv *Server) Unsubscribe(client *Client, channelName string) { srv.RLock() ch, found := srv.Dict[channelName] srv.RUnlock() if found { if ch.DeleteClient(client) == 0 { ch.Exit() srv.Lock() delete(srv.Dict, channelName) srv.Unlock() } } } //發布消息 func (srv *Server) PublishMessage(channelName, message string) (bool, error) { srv.RLock() ch, found := srv.Dict[channelName] if !found { srv.RUnlock() return false, errors.New("channelName不存在!") } srv.RUnlock() ch.Notify(message) ch.Wait() return true, nil }
客戶端代碼:client.go
import ( "fmt" "sync" "sync/atomic" ) type Channel struct { Name string clients map[int]*Client // exitChan chan int sync.RWMutex waitGroup WaitGroupWrapper messageCount uint64 exitFlag int32 } func NewChannel(channelName string) *Channel { return &Channel{ Name: channelName, // exitChan: make(chan int), clients: make(map[int]*Client), } } func (ch *Channel) AddClient(client *Client) bool { ch.RLock() _, found := ch.clients[client.Id] ch.RUnlock() ch.Lock() if !found { ch.clients[client.Id] = client } ch.Unlock() return found } func (ch *Channel) DeleteClient(client *Client) int { var ret int ch.ReplyMsg( fmt.Sprintf("從channel:%s 中刪除client:%d ", ch.Name, client.Id)) ch.Lock() delete(ch.clients, client.Id) ch.Unlock() ch.RLock() ret = len(ch.clients) ch.RUnlock() return ret } func (ch *Channel) Notify(message string) bool { ch.RLock() defer ch.RUnlock() for cid, _ := range ch.clients { ch.ReplyMsg( fmt.Sprintf("channel:%s client:%d message:%s", ch.Name, cid, message)) } return true } func (ch *Channel) ReplyMsg(message string) { ch.waitGroup.Wrap(func() { fmt.Println(message) }) } func (ch *Channel) Wait() { ch.waitGroup.Wait() } func (ch *Channel) Exiting() bool { return atomic.LoadInt32(&ch.exitFlag) == 1 } func (ch *Channel) Exit() { if !atomic.CompareAndSwapInt32(&ch.exitFlag, 0, 1) { return } //close(ch.exitChan) ch.Wait() } func (ch *Channel) PutMessage(clientID int, message string) { ch.RLock() defer ch.RUnlock() if ch.Exiting() { return } //select { // case <-t.exitChan: // return //} fmt.Println(ch.Name, ":", message) atomic.AddUint64(&ch.messageCount, 1) return }
最后是主函數文件:
package main import ( . "pubsub" ) func main(){ c1 := &Client{Id:100,Ip:"172.18.1.1"} c3:= &Client{Id:300,Ip:"172.18.1.3"} srv := NewServer() srv.Subscribe(c1,"Topic") srv.Subscribe(c3,"Topic") srv.PublishMessage("Topic","測試信息1") srv.Unsubscribe(c3,"Topic") srv.PublishMessage("Topic","測試信息2222") srv.Subscribe(c1,"Topic2") srv.Subscribe(c3,"Topic2") srv.PublishMessage("Topic2"," Topic2的測試信息") }