gRPC 發布訂閱模式


嘗試基於grpc和docker pubsub包,提供一個跨網絡的發布和訂閱系統

安裝依賴: go get github.com/moby/moby/pkg/pubsub

 

首先通過proto定義一個發布和訂閱服務接口:

syntax="proto3";
package pb;
option go_package="../pb";

message String{
  string value=1;
}

service PubsubService{
  rpc Publish (String) returns (String);
  rpc Subscribe (String) returns (stream String);
}

 

其中Publish是一個普通的grpc服務函數,Subscribe是一個單向流函數,接收一個我們自定義的String類型,返回的是一個String流。

下面來實現發布和訂閱服務了

首先定義一個我們需要的發布訂閱的結構體來實現發布訂閱函數:

type PubsubService  struct {
   pub *pubsub.Publisher
}

func NewPubsubService() *PubsubService {
   return &PubsubService{
      // 新建一個Publisher對象
      pub: pubsub.NewPublisher(100*time.Millisecond, 10),
   }
}

 

我們簡單看看pubsub.Publisher這個結構體

 

其中有四個變量:讀寫鎖、緩沖區大小、超時時間、為訂閱者提供的函數過濾器,其中也提供了一些對應的方法,之后我們需要用到的時候再看

消息發布接口的實現:

// Publish 實現發布方法
func (p *PubsubService) Publish(ctx context.Context, arg *pb.String) (*pb.String, error) {
    // 發布消息
    p.pub.Publish(arg.GetValue())
    return &pb.String{}, nil
}

 

在此我們使用了pub.Publish()方法,該方法會將參數中的數據發送給當前訂閱該發布者的所有訂閱者

消息訂閱接口的實現:

// Subscribe 實現訂閱方法
func (p *PubsubService) Subscribe(arg *pb.String, stream pb.PubsubService_SubscribeServer) error {
	// SubscribeTopic 增加一個使用函數過濾器的訂閱者
	// func(v interface{}) 定義函數過濾的規則
	// SubscribeTopic 返回一個chan interface{}
	ch := p.pub.SubscribeTopic(func(v interface{}) bool {
		// 接收數據是string,並且key是以arg為前綴的
		if key, ok := v.(string); ok {
			if strings.HasPrefix(key, arg.GetValue()) {
				return true
			}
		}
		return false
	})

	// 服務器遍歷chan,並將其中信息發送給訂閱客戶端
	for v := range ch {
		if err := stream.Send(&pb.String{Value: v.(string)}); err != nil {
			return err
		}
	}
	return nil
}

 

我們模擬一個客戶端,向服務器發布信息:

// 從客戶端向服務器發布信息
func main() {
	conn, err := grpc.Dial("localhost:1234",
		grpc.WithTransportCredentials(insecure.NewCredentials()))
	if err != nil {
		log.Fatal(err)
	}
	defer conn.Close()

	// 新建一個客戶端
	client := pb.NewPubsubServiceClient(conn)

	// 客戶端發布信息 golang :hello Go
	_, err = client.Publish(context.Background(), &pb.String{Value: "golang: hello Go"})
	if err != nil {
		log.Fatal(err)
	}

	// 客戶端發布信息 docker: hello Docker
	_, err = client.Publish(context.Background(), &pb.String{Value: "docker: hello Docker"})
	if err != nil {
		log.Fatal(err)
	}
}

 

然后在另一個文件中,模仿訂閱者去訂閱這個服務,注意我們的函數過濾器規則,看看訂閱者會收到什么信息?

func main() {
	conn, err := grpc.Dial("localhost:1234", grpc.WithTransportCredentials(insecure.NewCredentials()))
	if err != nil {
		log.Fatal(err)
	}
	defer conn.Close()

	// 新建一個客戶端
	client := pb.NewPubsubServiceClient(conn)

	// 訂閱服務,傳入參數是 golang:
	// 會想過濾器函數,訂閱者應該收到的信息為 golang: hello Go
	stream, err := client.Subscribe(context.Background(), &pb.String{Value: "golang: "})
	if err != nil {
		log.Fatal(err)
	}

	// 阻塞遍歷流,輸出結果
	for {
		reply, err := stream.Recv()
		if err != nil {
			if err == io.EOF {
				break
			}
			log.Fatal(err)
		}
		fmt.Println(reply.GetValue())
	}
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM