gRPC的發布訂閱模式


 

  之前我們通過Watch機制實現了簡化版本的監視服務,這里我們基於這種機制實現一個發布訂閱模式,但是因為RPC缺乏流機制導致每次只能返回一個結果,在發布訂閱模式中,由調用者主動發起的發布行為類似於一個普通函數調用,而被動的訂閱者則類似gRPC客戶端單向流中的接收者。現在我們可以嘗試基於gRPC的流特性構造一個發布訂閱系統。

  首先我們需要使用一個第三方模塊:go get  github.com/docker/docker, 下面我們寫一個簡單的訂閱模式demo:

package main
import (
	"fmt"
	"github.com/docker/docker/pkg/pubsub"
	"strings"
	"time"
)
func main(){
	p := pubsub.NewPublisher(100*time.Microsecond, 10)
	golang := p.SubscribeTopic(func(v interface{}) bool {
		if key, ok := v.(string); ok {
			if strings.HasPrefix(key, "golang:") {
				return true
			}
		}
		return false
	})

	docker := p.SubscribeTopic(func(v interface{}) bool {
		if key, ok := v.(string); ok {
			if strings.HasPrefix(key, "docker:") {
				return true
			}
		}
		return false
	})

	go p.Publish("wang")
	go p.Publish("golang: https://golang.org")
	go p.Publish("docker: https://www.docker.com")

	time.Sleep(time.Second*2)
	go func() {
		fmt.Println("golang topic:", <-golang)
	}()

	go func() {
		fmt.Println("docker topic:",<-docker)
	}()
	time.Sleep(time.Second*3)
	fmt.Println("end")
}

以上代碼運行后,會通過我們的訂閱過濾函數:p.SubscribeTopic過濾調我們不是訂閱的信息,最終打印出相關的結果。

 

gRPC發布訂閱實例

我們分別需要有一個proto文件定義服務端和客戶端的接口實現,里面定義了傳輸數據類型和實現方法,緊接着我們需要一個服務端,它用來支撐起整個服務給所有的客戶端訪問,再然后我們需要兩個客戶端,一個發布一個訂閱(先訂閱)。

proto/publish.proto文件:

syntax="proto3";

package proto;
message StringPub{
  string value =1;
}

service PubsubService {
  // 發布是rpc的普通方法
  rpc Publish (StringPub) returns (StringPub);
  // 訂閱則是一個單向的流服務,服務端返回的數據可能很大
  rpc Subscribe (StringPub) returns (stream StringPub);
}

我們使用:protoc --go_out=plugins=grpc:. publish.proto 生成相應的go文件,重點分析:

// 客戶端接受體
type pubsubServiceClient struct {
	cc *grpc.ClientConn
}

// 客戶端調用它生成接受體
func NewPubsubServiceClient(cc *grpc.ClientConn) PubsubServiceClient {
	return &pubsubServiceClient{cc}
}

// 客戶端的方法實現
func (c *pubsubServiceClient) Publish(ctx context.Context, in *StringPub, opts ...grpc.CallOption) (*StringPub, error) {
	out := new(StringPub)
	err := c.cc.Invoke(ctx, "/proto.PubsubService/Publish", in, out, opts...)
	if err != nil {
		return nil, err
	}
	return out, nil
}

func (c *pubsubServiceClient) Subscribe(ctx context.Context, in *StringPub, opts ...grpc.CallOption) (PubsubService_SubscribeClient, error) {
	stream, err := c.cc.NewStream(ctx, &_PubsubService_serviceDesc.Streams[0], "/proto.PubsubService/Subscribe", opts...)
	if err != nil {
		return nil, err
	}
	x := &pubsubServiceSubscribeClient{stream}
	if err := x.ClientStream.SendMsg(in); err != nil {
		return nil, err
	}
	if err := x.ClientStream.CloseSend(); err != nil {
		return nil, err
	}
	return x, nil
}

// 返回用於發送接受的對象,類似socket
type PubsubService_SubscribeClient interface {
	Recv() (*StringPub, error)
	grpc.ClientStream
}

type pubsubServiceSubscribeClient struct {
	grpc.ClientStream
}

...
func (x *pubsubServiceSubscribeClient) Recv() (*StringPub, error) {
	m := new(StringPub)
	if err := x.ClientStream.RecvMsg(m); err != nil {
		return nil, err
	}
	return m, nil
}

// PubsubServiceServer is the server API for PubsubService service.
type PubsubServiceServer interface {
	// 發布是rpc的普通方法
	Publish(context.Context, *StringPub) (*StringPub, error)
	// 訂閱則是一個單向的流服務,服務端返回的數據可能很大
	Subscribe(*StringPub, PubsubService_SubscribeServer) error
}

func RegisterPubsubServiceServer(s *grpc.Server, srv PubsubServiceServer) {
	s.RegisterService(&_PubsubService_serviceDesc, srv)
}

type PubsubService_SubscribeServer interface {
	Send(*StringPub) error
	grpc.ServerStream
}

  

接着我們可以實現一個發布的客戶端:pub_service.go:

package main

import (
	"context"
	"gRPC_demo/proto"
	"google.golang.org/grpc"
	"log"
)

func main() {
	conn, err := grpc.Dial("localhost:1234", grpc.WithInsecure())
	if err != nil {
		log.Fatal(err)
	}
	defer conn.Close()
	client := proto.NewPubsubServiceClient(conn)

	_, err = client.Publish(context.Background(), &proto.StringPub{Value: "golang: hello Wang"})
	if err != nil {
		log.Fatal(err)
	}

	_, err = client.Publish(context.Background(), &proto.StringPub{Value: "docker: hello Wang"})
	if err != nil {
		log.Fatal(err)
	}
}

  

最后我們實現訂閱客戶端:sub_client.go:

package main

import (
	"context"
	"fmt"
	"gRPC_demo/proto"
	"google.golang.org/grpc"
	"io"
	"log"
)

func main() {
	conn, err := grpc.Dial("localhost:1234", grpc.WithInsecure())
	if err != nil {
		log.Fatal(err)
	}
	defer conn.Close()
	client := proto.NewPubsubServiceClient(conn)

	stream, err := client.Subscribe(context.Background(),&proto.StringPub{Value:"golang: "})
	if err != nil {
		log.Fatal(err)
	}

	for {
		reply, err := stream.Recv()
		if err != nil {
			if err == io.EOF{
				break
			}
			log.Fatal(err)
		}

		fmt.Println(reply.GetValue())
	}
}

 到此發布訂閱功能就基本實現了,依次啟動服務端,訂閱客戶端,發布客戶端就可以查看結果了。

 

手寫發布訂閱

分別由服務端、客戶端代碼,以及一個調用文件。(此實例是摘抄於:http://www.itkeyword.com/doc/602627406483745x914/golang-pubsub)感覺寫的不錯,所以摘抄。

服務端代碼:service.go

package pubsub

import (
	"errors"
	"sync"
)

type Client struct {
	Id int
	Ip string
}

type Server struct {
	Dict map[string]*Channel //map[Channel.Name]*Channel
	sync.RWMutex
}

func NewServer() *Server {
	s := &Server{}
	s.Dict = make(map[string]*Channel) //所有channel
	return s
}

//訂閱
func (srv *Server) Subscribe(client *Client, channelName string) {

	// 客戶是否在Channel的客戶列表中
	srv.RLock()
	ch, found := srv.Dict[channelName]
	srv.RUnlock()

	if !found {
		ch = NewChannel(channelName)
		ch.AddClient(client)
		srv.Lock()
		srv.Dict[channelName] = ch
		srv.Unlock()
	} else {
		ch.AddClient(client)
	}

}

//取消訂閱
func (srv *Server) Unsubscribe(client *Client, channelName string) {
	srv.RLock()
	ch, found := srv.Dict[channelName]
	srv.RUnlock()
	if found {
		if ch.DeleteClient(client) == 0 {
			ch.Exit()
			srv.Lock()
			delete(srv.Dict, channelName)
			srv.Unlock()
		}
	}
}

//發布消息
func (srv *Server) PublishMessage(channelName, message string) (bool, error) {
	srv.RLock()
	ch, found := srv.Dict[channelName]
	if !found {
		srv.RUnlock()
		return false, errors.New("channelName不存在!")
	}
	srv.RUnlock()

	ch.Notify(message)
	ch.Wait()
	return true, nil
}

  

客戶端代碼:client.go

import (
	"fmt"
	"sync"
	"sync/atomic"
)

type Channel struct {
	Name    string
	clients map[int]*Client
	//  exitChan   chan int
	sync.RWMutex
	waitGroup    WaitGroupWrapper
	messageCount uint64
	exitFlag     int32
}

func NewChannel(channelName string) *Channel {
	return &Channel{
		Name: channelName,
		//  exitChan:       make(chan int),
		clients: make(map[int]*Client),
	}
}

func (ch *Channel) AddClient(client *Client) bool {
	ch.RLock()
	_, found := ch.clients[client.Id]
	ch.RUnlock()

	ch.Lock()
	if !found {
		ch.clients[client.Id] = client
	}
	ch.Unlock()
	return found
}

func (ch *Channel) DeleteClient(client *Client) int {
	var ret int
	ch.ReplyMsg(
		fmt.Sprintf("從channel:%s 中刪除client:%d ", ch.Name, client.Id))

	ch.Lock()
	delete(ch.clients, client.Id)
	ch.Unlock()

	ch.RLock()
	ret = len(ch.clients)
	ch.RUnlock()

	return ret
}

func (ch *Channel) Notify(message string) bool {

	ch.RLock()
	defer ch.RUnlock()

	for cid, _ := range ch.clients {
		ch.ReplyMsg(
			fmt.Sprintf("channel:%s client:%d message:%s", ch.Name, cid, message))
	}
	return true
}

func (ch *Channel) ReplyMsg(message string) {
	ch.waitGroup.Wrap(func() { fmt.Println(message) })
}

func (ch *Channel) Wait() {
	ch.waitGroup.Wait()
}

func (ch *Channel) Exiting() bool {
	return atomic.LoadInt32(&ch.exitFlag) == 1
}

func (ch *Channel) Exit() {
	if !atomic.CompareAndSwapInt32(&ch.exitFlag, 0, 1) {
		return
	}
	//close(ch.exitChan)
	ch.Wait()
}

func (ch *Channel) PutMessage(clientID int, message string) {
	ch.RLock()
	defer ch.RUnlock()

	if ch.Exiting() {
		return
	}

	//select {
	// case <-t.exitChan:
	// return
	//}
	fmt.Println(ch.Name, ":", message)

	atomic.AddUint64(&ch.messageCount, 1)
	return
}

  

最后是主函數文件:

package main

import (
  . "pubsub"
)

func main(){
  c1 := &Client{Id:100,Ip:"172.18.1.1"}
  c3:=  &Client{Id:300,Ip:"172.18.1.3"}

   srv := NewServer()
   srv.Subscribe(c1,"Topic")
   srv.Subscribe(c3,"Topic")

   srv.PublishMessage("Topic","測試信息1")

   srv.Unsubscribe(c3,"Topic")
   srv.PublishMessage("Topic","測試信息2222")

    srv.Subscribe(c1,"Topic2")
    srv.Subscribe(c3,"Topic2")
    srv.PublishMessage("Topic2"," Topic2的測試信息")   
}

  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM