啟動服務
首先啟動 go-nats-streaming 服務。為了更加能說明問題,我們啟動的時候不使用默認端口號
$ nats-streaming-server -p 4242 -m 8222 -DV
編寫一個簡單的應用
package main import ( "log" "github.com/nats-io/go-nats-streaming" ) func main() { //stan.Connect(clusterID, clientID, ops ...Option) ns, _ := stan.Connect("test-cluster", "myID", stan.NatsURL("nats://localhost:4242")) // Simple Synchronous Publisher // does not return until an ack has been received from NATS Streaming ns.Publish("foo", []byte("Hello World")) // Simple Async Subscriber sub, _ := ns.Subscribe("foo", func(m *stan.Msg) { log.Printf("Received a message: %s\n", string(m.Data)) }, stan.StartWithLastReceived()) log.Printf("subscribing to subject 'foo' \n") // Unsubscribe sub.Unsubscribe() // Close connection ns.Close() }
上面的代碼使用了訂閱者啟動參數的 StartWithLastReceived,這個函數的含義為:讀取剛才發布者最近發布的消息內容。具體還有哪些啟動參數,以下列出詳情:
// Subscribe starting with most recently published value sub, err := sc.Subscribe("foo", func(m *stan.Msg) { fmt.Printf("Received a message: %s\n", string(m.Data)) }, StartWithLastReceived()) // Receive all stored values in order sub, err := sc.Subscribe("foo", func(m *stan.Msg) { fmt.Printf("Received a message: %s\n", string(m.Data)) }, DeliverAllAvailable()) // Receive messages starting at a specific sequence number sub, err := sc.Subscribe("foo", func(m *stan.Msg) { fmt.Printf("Received a message: %s\n", string(m.Data)) }, StartAtSequence(22)) // Subscribe starting at a specific time var startTime time.Time ... sub, err := sc.Subscribe("foo", func(m *stan.Msg) { fmt.Printf("Received a message: %s\n", string(m.Data)) }, StartAtTime(startTime)) // Subscribe starting a specific amount of time in the past (e.g. 30 seconds ago) sub, err := sc.Subscribe("foo", func(m *stan.Msg) { fmt.Printf("Received a message: %s\n", string(m.Data)) }, StartAtTimeDelta(time.Duration(30)))