介紹
使用golang連接activemq發送數據的話,需要使用一個叫做stomp的包,直接go get github.com/go-stomp/stomp即可
代碼
生產者
package main
import (
"fmt"
"github.com/go-stomp/stomp"
"time"
)
func main(){
// 調用Dial方法,第一個參數是"tcp",第二個參數則是ip:port
// 返回conn(連接)和err(錯誤)
conn,err:=stomp.Dial("tcp", "47.adsasaads89:61613")
// 錯誤判斷
if err!=nil{
fmt.Println("err =", err)
return
}
//發送十條數據
for i:=0;i<10;i++ {
// 調用conn下的send方法,接收三個參數
//參數一:隊列的名字
//參數二:數據類型,一般是文本類型,直接寫text/plain即可
//參數三:內容,記住要轉化成byte數組的格式
//返回一個error
err := conn.Send("testQ", "text/plain",[]byte(fmt.Sprintf("message:%d", i)))
if err!=nil{
fmt.Println("err =", err)
}
}
/*
這里為什么要sleep一下,那就是conn.Send這個過程是不阻塞的
相當於Send把數據放到了一個channel里面
另一個goroutine從channel里面去取數據再放到消息隊列里面
但是還沒等到另一個goroutine放入數據,此時循環已經結束了
因此最好要sleep一下,根據測試,如果不sleep,那么發送1000條數據,
最終進入隊列的大概是980條數據,這說明了什么
說明了當程序把1000條數據放到channel里面的時候,另一個goroutine只往隊列里面放了980條
剩余的20條還沒有來得及放入,程序就結束了
*/
time.Sleep(time.Second * 1)
}
消費者
package main
import (
"fmt"
"github.com/go-stomp/stomp"
"time"
)
func recv_data(ch chan *stomp.Message) {
//不斷地循環,從channel里面獲取數據
for {
v := <-ch
//這里是打印當然還可以做其他的操作,比如寫入hdfs平台
//v是*stomp.Message類型,屬性都在這里面
/*
type Message struct {
// Indicates whether an error was received on the subscription.
// The error will contain details of the error. If the server
// sent an ERROR frame, then the Body, ContentType and Header fields
// will be populated according to the contents of the ERROR frame.
Err error
// Destination the message has been sent to.
Destination string
// MIME content type.
ContentType string // MIME content
// Connection that the message was received on.
Conn *Conn
// Subscription associated with the message.
Subscription *Subscription
// Optional header entries. When received from the server,
// these are the header entries received with the message.
Header *frame.Header
// The ContentType indicates the format of this body.
Body []byte // Content of message
}
*/
fmt.Println(string(v.Body))
}
}
func main() {
//創建一個channel,存放的是*stomp.Message類型
ch := make(chan *stomp.Message)
//將管道傳入函數中
go recv_data(ch)
//和生產者一樣,調用Dial方法,返回conn和err
conn, err := stomp.Dial("tcp", "47.dsdsadsa9:61613")
if err != nil {
fmt.Println("err =", err)
}
//消費者訂閱這個隊列
//參數一:隊列名
//參數二:確認信息,直接填默認地即可
sub, err := conn.Subscribe("testQ", stomp.AckMode(stomp.AckAuto))
for { //無限循環
select {
//sub.C是一個channel,如果訂閱的隊列有數據就讀取
case v := <-sub.C:
//讀取的數據是一個*stomp.Message類型
ch <- v
//如果30秒還沒有人發數據的話,就結束
case <-time.After(time.Second * 30):
return
}
}
}
message:0
message:1
message:2
message:3
message:4
message:5
message:6
message:7
message:8
message:9