NATS_13:NATS Streaming案例讲解


启动服务

  首先启动 go-nats-streaming 服务。为了更加能说明问题,我们启动的时候不使用默认端口号

 $ nats-streaming-server -p 4242 -m 8222 -DV

 编写一个简单的应用

package main

import (
    "log"
    "github.com/nats-io/go-nats-streaming"
)

func main() {
    //stan.Connect(clusterID, clientID, ops ...Option)
    ns, _ := stan.Connect("test-cluster", "myID", stan.NatsURL("nats://localhost:4242"))

    // Simple Synchronous Publisher
    // does not return until an ack has been received from NATS Streaming
    ns.Publish("foo", []byte("Hello World"))

    // Simple Async Subscriber
    sub, _ := ns.Subscribe("foo", func(m *stan.Msg) {
        log.Printf("Received a message: %s\n", string(m.Data))
    }, stan.StartWithLastReceived())

    log.Printf("subscribing to subject 'foo' \n")

    // Unsubscribe
    sub.Unsubscribe()

    // Close connection
    ns.Close()
}

  上面的代码使用了订阅者启动参数的 StartWithLastReceived,这个函数的含义为:读取刚才发布者最近发布的消息内容。具体还有哪些启动参数,以下列出详情:

// Subscribe starting with most recently published value
sub, err := sc.Subscribe("foo", func(m *stan.Msg) {
    fmt.Printf("Received a message: %s\n", string(m.Data))
}, StartWithLastReceived())

// Receive all stored values in order
sub, err := sc.Subscribe("foo", func(m *stan.Msg) {
    fmt.Printf("Received a message: %s\n", string(m.Data))
}, DeliverAllAvailable())

// Receive messages starting at a specific sequence number
sub, err := sc.Subscribe("foo", func(m *stan.Msg) {
    fmt.Printf("Received a message: %s\n", string(m.Data))
}, StartAtSequence(22))

// Subscribe starting at a specific time
var startTime time.Time
...
sub, err := sc.Subscribe("foo", func(m *stan.Msg) {
    fmt.Printf("Received a message: %s\n", string(m.Data))
}, StartAtTime(startTime))

// Subscribe starting a specific amount of time in the past (e.g. 30 seconds ago)
sub, err := sc.Subscribe("foo", func(m *stan.Msg) {
    fmt.Printf("Received a message: %s\n", string(m.Data))
}, StartAtTimeDelta(time.Duration(30)))

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM