雖然個人也不怎么推薦activeMQ, 只是由於項目需要, 所以也做一個簡單的整理, 在訂閱的時候 ,一般我們的業務都是處理字符串,但有時候AckMode 設置為AckAuto不可以,客服端處理完業務后在發回確認,所以訂閱封裝了2個方法
utils.go如下:
//Usage: // // //Send // if err := utils.NewActiveMQ("localhost:61613").Send("/queue/test-1", "test from 1"); err != nil { // fmt.Println("AMQ ERROR:", err) // // //this func will handle the messges get from activeMQ server. // handler := func(msg string,err error) { fmt.Println("AMQ MSG:", err, msg) } // if err := utils.NewActiveMQ("localhost:61613").Subscribe("/queue/test-1", handler); err != nil { // fmt.Println("AMQ ERROR:", err) // } // package utils import ( "time" "github.com/go-stomp/stomp" ) type ActiveMQ struct { Addr string } var options = []func(*stomp.Conn) error{ //設置讀寫超時,超時時間為1個小時 stomp.ConnOpt.HeartBeat(7200*time.Second, 7200*time.Second), stomp.ConnOpt.HeartBeatError(360 * time.Second), } //New activeMQ with addr[eg:localhost:61613] as host address. func NewActiveMQ(addr string) *ActiveMQ { if addr == "" { addr = "localhost:61613" } return &ActiveMQ{addr} } // Used for health check func (this *ActiveMQ) Check() error { conn, err := this.Connect() if err == nil { defer conn.Disconnect() return nil } else { return err } } // Connect to activeMQ func (this *ActiveMQ) Connect() (*stomp.Conn, error) { return stomp.Dial("tcp", this.Addr, options...) } // Send msg to destination func (this *ActiveMQ) Send(destination string, msg string) error { conn, err := this.Connect() if err != nil { return err } defer conn.Disconnect() return conn.Send( destination, // destination "text/plain", // content-type []byte(msg)) // body } // Subscribe Message from destination // func handler handle msg reveived from destination func (this *ActiveMQ) Subscribe(destination string, ack stomp.AckMode, handler func(msg *stomp.Message, con *stomp.Conn, err error)) error { conn, err := this.Connect() if err != nil { return err } //sub, err := conn.Subscribe(destination, stomp.AckAuto) sub, err := conn.Subscribe(destination, ack) if err != nil { return err } defer conn.Disconnect() defer sub.Unsubscribe() for { m := <-sub.C handler(m, conn, m.Err) } return err } // func (this *ActiveMQ) SubscribeAuto(destination string, handler func(msg string, err error)) error { conn, err := this.Connect() if err != nil { return err } sub, err := conn.Subscribe(destination, stomp.AckAuto) if err != nil { return err } defer conn.Disconnect() defer sub.Unsubscribe() for { m := <-sub.C handler(string(m.Body), m.Err) } return err }
調用就非常簡單了:
package main import ( "fmt" "main/utils" "strconv" "github.com/go-stomp/stomp" ) func main() { //生產者 go func() { mq := utils.NewActiveMQ("localhost:61613") for i := 0; i < 100; i++ { mq.Send("main", "demo"+strconv.Itoa(i+1)) } }() //消費者 go func() { mq := utils.NewActiveMQ("localhost:61613") //mq.SubscribeAuto("main", handler) mq.Subscribe("main", stomp.AckClient, handler2) }() fmt.Println("activeMQ test") var s string fmt.Scan(&s) } func handler(msg string, err error) { if err != nil { fmt.Println(err) } else { fmt.Println("AMQ MSG:", msg) } } func handler2(msg *stomp.Message, con *stomp.Conn, err error) { if err != nil { fmt.Println(err) } else { fmt.Println("AMQ MSG:", string(msg.Body)) con.Ack(msg) } }