NATS_13:NATS Streaming案例講解


啟動服務

  首先啟動 go-nats-streaming 服務。為了更加能說明問題,我們啟動的時候不使用默認端口號

 $ nats-streaming-server -p 4242 -m 8222 -DV

 編寫一個簡單的應用

package main

import (
    "log"
    "github.com/nats-io/go-nats-streaming"
)

func main() {
    //stan.Connect(clusterID, clientID, ops ...Option)
    ns, _ := stan.Connect("test-cluster", "myID", stan.NatsURL("nats://localhost:4242"))

    // Simple Synchronous Publisher
    // does not return until an ack has been received from NATS Streaming
    ns.Publish("foo", []byte("Hello World"))

    // Simple Async Subscriber
    sub, _ := ns.Subscribe("foo", func(m *stan.Msg) {
        log.Printf("Received a message: %s\n", string(m.Data))
    }, stan.StartWithLastReceived())

    log.Printf("subscribing to subject 'foo' \n")

    // Unsubscribe
    sub.Unsubscribe()

    // Close connection
    ns.Close()
}

  上面的代碼使用了訂閱者啟動參數的 StartWithLastReceived,這個函數的含義為:讀取剛才發布者最近發布的消息內容。具體還有哪些啟動參數,以下列出詳情:

// Subscribe starting with most recently published value
sub, err := sc.Subscribe("foo", func(m *stan.Msg) {
    fmt.Printf("Received a message: %s\n", string(m.Data))
}, StartWithLastReceived())

// Receive all stored values in order
sub, err := sc.Subscribe("foo", func(m *stan.Msg) {
    fmt.Printf("Received a message: %s\n", string(m.Data))
}, DeliverAllAvailable())

// Receive messages starting at a specific sequence number
sub, err := sc.Subscribe("foo", func(m *stan.Msg) {
    fmt.Printf("Received a message: %s\n", string(m.Data))
}, StartAtSequence(22))

// Subscribe starting at a specific time
var startTime time.Time
...
sub, err := sc.Subscribe("foo", func(m *stan.Msg) {
    fmt.Printf("Received a message: %s\n", string(m.Data))
}, StartAtTime(startTime))

// Subscribe starting a specific amount of time in the past (e.g. 30 seconds ago)
sub, err := sc.Subscribe("foo", func(m *stan.Msg) {
    fmt.Printf("Received a message: %s\n", string(m.Data))
}, StartAtTimeDelta(time.Duration(30)))

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM