嘗試基於grpc和docker pubsub包,提供一個跨網絡的發布和訂閱系統
安裝依賴: go get github.com/moby/moby/pkg/pubsub
首先通過proto定義一個發布和訂閱服務接口:
syntax="proto3";
package pb;
option go_package="../pb";
message String{
string value=1;
}
service PubsubService{
rpc Publish (String) returns (String);
rpc Subscribe (String) returns (stream String);
}
其中Publish是一個普通的grpc服務函數,Subscribe是一個單向流函數,接收一個我們自定義的String類型,返回的是一個String流。
下面來實現發布和訂閱服務了
首先定義一個我們需要的發布訂閱的結構體來實現發布訂閱函數:
type PubsubService struct {
pub *pubsub.Publisher
}
func NewPubsubService() *PubsubService {
return &PubsubService{
// 新建一個Publisher對象
pub: pubsub.NewPublisher(100*time.Millisecond, 10),
}
}
我們簡單看看pubsub.Publisher這個結構體
其中有四個變量:讀寫鎖、緩沖區大小、超時時間、為訂閱者提供的函數過濾器,其中也提供了一些對應的方法,之后我們需要用到的時候再看
消息發布接口的實現:
// Publish 實現發布方法
func (p *PubsubService) Publish(ctx context.Context, arg *pb.String) (*pb.String, error) {
// 發布消息
p.pub.Publish(arg.GetValue())
return &pb.String{}, nil
}
在此我們使用了pub.Publish()方法,該方法會將參數中的數據發送給當前訂閱該發布者的所有訂閱者
消息訂閱接口的實現:
// Subscribe 實現訂閱方法
func (p *PubsubService) Subscribe(arg *pb.String, stream pb.PubsubService_SubscribeServer) error {
// SubscribeTopic 增加一個使用函數過濾器的訂閱者
// func(v interface{}) 定義函數過濾的規則
// SubscribeTopic 返回一個chan interface{}
ch := p.pub.SubscribeTopic(func(v interface{}) bool {
// 接收數據是string,並且key是以arg為前綴的
if key, ok := v.(string); ok {
if strings.HasPrefix(key, arg.GetValue()) {
return true
}
}
return false
})
// 服務器遍歷chan,並將其中信息發送給訂閱客戶端
for v := range ch {
if err := stream.Send(&pb.String{Value: v.(string)}); err != nil {
return err
}
}
return nil
}
我們模擬一個客戶端,向服務器發布信息:
// 從客戶端向服務器發布信息
func main() {
conn, err := grpc.Dial("localhost:1234",
grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatal(err)
}
defer conn.Close()
// 新建一個客戶端
client := pb.NewPubsubServiceClient(conn)
// 客戶端發布信息 golang :hello Go
_, err = client.Publish(context.Background(), &pb.String{Value: "golang: hello Go"})
if err != nil {
log.Fatal(err)
}
// 客戶端發布信息 docker: hello Docker
_, err = client.Publish(context.Background(), &pb.String{Value: "docker: hello Docker"})
if err != nil {
log.Fatal(err)
}
}
然后在另一個文件中,模仿訂閱者去訂閱這個服務,注意我們的函數過濾器規則,看看訂閱者會收到什么信息?
func main() {
conn, err := grpc.Dial("localhost:1234", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatal(err)
}
defer conn.Close()
// 新建一個客戶端
client := pb.NewPubsubServiceClient(conn)
// 訂閱服務,傳入參數是 golang:
// 會想過濾器函數,訂閱者應該收到的信息為 golang: hello Go
stream, err := client.Subscribe(context.Background(), &pb.String{Value: "golang: "})
if err != nil {
log.Fatal(err)
}
// 阻塞遍歷流,輸出結果
for {
reply, err := stream.Recv()
if err != nil {
if err == io.EOF {
break
}
log.Fatal(err)
}
fmt.Println(reply.GetValue())
}
}