【Apache Pulsar】Apache Pulsar單機環境及Go語言開發環境搭建


 

0x01 簡介

Apache Pulsar是一個開源的分布式發布-訂閱消息系統,與Kafka類似,但比后者更加強大。Pulsar最初由Yahoo開發並維護,目前已經成為Apache軟件組織的一個孵化子項目,當前最新版本號為2.1.0-incubating。官網地址:http://pulsar.apache.org/

0x02 Apache Pulsar單機版環境搭建

1、前提條件

Pulsar目前僅僅支持MacOS和Linux系統,不支持Windows系統。並且要求系統中安裝了Java 8環境。

2、系統環境

我們以CentOS系統作為搭建環境,系統為CentOS7.2。

3、搭建步驟

首先,訪問官網下載網頁http://pulsar.apache.org/en/download/,如下圖所示:

由於我本地環境的限制,所以本文中所有的文件下載都是首先在Windows系統中下載,然后手動拷貝到Linux服務器上的。

此處,我們點擊下載第一個,即二進制發布。然后拷貝到CentOS服務器上,並解壓該壓縮包,結果如下:

進入對應解壓得到的文件夾,該文件夾下文件如下:

進入conf文件夾下,並使用vi或vim打開文件client.conf,修改里面的webServiceUrl和brokerServiceUrl字段中對應的IP為該服務器IP,如下所示(其中塗改部分為服務器IP):

保存並退出,然后進入到bin目錄下,以后台運行模式啟動pulsar服務,如下:

由於之前我已經啟動了后台服務,所以上圖中提示已經在運行該服務。
如此簡單,pulsar單機版就這么順利的運行起來了。然而,如何驗證是否正常啟動了呢?一種是通過查看日志文件來確保正常啟動,此處略去這種方式。直接使用指令來驗證是否正常啟動:
(1)創建消費者consumer
指令:

$ ./pulsar-client consume -s "my-subscription" my-topic

含義:創建一個consumer,該consumer訂閱的topic名稱為my-topic,本訂閱名稱為my-subscription。創建成功會打印如下信息(只截圖了部分信息):

創建成功后,該consumer就處於等待接收消息狀態。

(2)創建生產者producer

$ ./pulsar-client produce my-topic --messages "test message from producer"

含義:創建一個producer,該producer對應的topic名稱為my-topic(與上面創建的consumer訂閱的topic相同),發送的消息由--messages指定,此處內容為“test message from producer”。創建成功會打印如下信息(只截圖了部分信息):

此時,我們會在1中創建的consumer端接收到producer發送的消息,如下圖:

至此,說明我們的pulsar服務正常運行。

0x03 Pulsar Go語言開發環境搭建

前提條件:開發電腦本地或Linux服務器中已經安裝好了Go開發環境。

在Windows系統中開發Pulsar時需要安裝GCC編譯環境,所以需要安裝MinGW,由於環境限制,這里我無法下載MinGW,所以就直接在CentOS系統中搭建開發環境了。

當前版本(2.1.0-incubating)下,Pulsar官方僅僅提供了C++、Java、Python、Go四種語言的客戶端開發包。且四種語言的支持特性不盡相同,如下所示:

此外,還有一些第三方的客戶端包,如下:

由於Pulsar Go客戶端庫是基於C++客戶端庫的,所以在安裝Go庫之前必須要確保已經成功安裝了C++客戶端庫。

1、安裝Pulsar C++客戶端

在Pulsar C++客戶端網頁中,下載下圖中所示的三個文件:

然后將下載的三個文件拷貝到CentOS服務器上,如下:

然后執行如下命令來安裝這三個RPM包:

$ rpm -ivh apache-pulsar-client*.rpm

此處暫且先不驗證是否安裝成功。

2、安裝Pulsar Go客戶端

由於我環境所限制,無法使用go get的方式來下載Pulsar的Go語言包,我是直接在GitHub上面下載的incubator-pulsar-branch-2.1.zip,解壓該文件得到如下內容:

此處,我們僅僅需要里面的pulsar-client-go文件夾里面的內容,根據官網上的示例程序可知該go語言包的路徑如下:

所以我們將pulsar-client-go拷貝到CentOS服務器上$GOPATH/src/github.com/apache/incubator-pulsar下,如果中間文件夾不存在就自己創建,最終如下:

到此,Pulsar Go客戶端包安裝完成。

此時,我們使用一個簡單的Pulsar Go程序來驗證上面安裝是否正常,程序內容如下:

 1 package main
 2 
 3 import (
 4     "fmt"
 5     "runtime"
 6     "context"
 7     "github.com/apache/incubator-pulsar/pulsar-client-go/pulsar"
 8     "log"
 9 )
10 
11 func main (){
12     fmt.Println("Pulsar Producer")
13 
14     ctx := context.Background()
15 
16     //實例化Pulsar client
17     client,err := pulsar.NewClient(pulsar.ClientOptions{
18         URL:"pulsar://xx.xx.xx.xx:6650",  //xx.xx.xx.xx代表Pulsar IP
19         OperationTimeoutSeconds:5,
20         MessageListenerThreads:runtime.NumCPU()/2,
21     })
22 
23     if err !=  nil {
24         log.Fatalf("Could not instantiate Pulsar client:%v",err)
25     }
26 
27 
28     // 創建producer
29     producer,err := client.CreateProducer(pulsar.ProducerOptions{
30         Topic:"my-topic",
31     })
32 
33     if err != nil {
34         log.Fatalf("Could not instantiate Pulsar producer:%v",err)
35     }
36 
37     defer producer.Close()
38 
39     msg := pulsar.ProducerMessage{
40         Payload:[]byte("Hello,This is a message from Pulsar Producer!"),
41     }
42 
43     if err := producer.Send(ctx,msg);err != nil {
44         log.Fatalf("Producer could not send message:%v",err)
45     }
46 
47 }

編譯並運行,結果如下:

但其實我們查看/usr/lib路徑下發現,其實是存在libpulsar.so.2.1.0-incubating這個庫文件的,所以應該是系統沒有引用到這個路徑:

所以,需要將該文件所在的路徑添加到到/etc/ld.so.conf中,如下:

此時,再次運行程序時則成功:

0x04 Pulsar producer和consumer示例程序

此處直接給出代碼,里面有必要的注釋:

1、producer

文件:pulsar-producer.go

 1 package main
 2 
 3 import (
 4     "fmt"
 5     "runtime"
 6     "context"
 7     "github.com/apache/incubator-pulsar/pulsar-client-go/pulsar"
 8     "log"
 9 )
10 
11 func main (){
12     fmt.Println("Pulsar Producer")
13 
14     ctx := context.Background()
15 
16     //實例化Pulsar client
17     client,err := pulsar.NewClient(pulsar.ClientOptions{
18         URL:"pulsar://xx.xx.xx.xx:6650",  // xx.xx.xx.xx代表Pulsar IP
19         OperationTimeoutSeconds:5,
20         MessageListenerThreads:runtime.NumCPU()/2,
21     })
22 
23     if err !=  nil {
24         log.Fatalf("Could not instantiate Pulsar client:%v",err)
25     }
26 
27 
28     // 創建producer
29     producer,err := client.CreateProducer(pulsar.ProducerOptions{
30         Topic:"my-topic",
31     })
32 
33     if err != nil {
34         log.Fatalf("Could not instantiate Pulsar producer:%v",err)
35     }
36 
37     defer producer.Close()
38 
39     msg := pulsar.ProducerMessage{
40         Payload:[]byte("Hello,This is a message from Pulsar Producer!"),
41     }
42 
43     if err := producer.Send(ctx,msg);err != nil {
44         log.Fatalf("Producer could not send message:%v",err)
45     }
46 
47 }

2、consumer

文件:pulsar-consumer.go

 1 package main
 2 
 3 import (
 4     "fmt"
 5     "github.com/apache/incubator-pulsar/pulsar-client-go/pulsar"
 6     "log"
 7     "context"
 8 )
 9 
10 func main()  {
11     fmt.Println("Pulsar Consumer")
12 
13     //實例化Pulsar client
14     client,err := pulsar.NewClient(pulsar.ClientOptions{
15         URL:"pulsar://xx.xx.xx.xx:6650", // xx.xx.xx.xx代表Pulsar IP
16     })
17 
18     if err != nil {
19         log.Fatal(err)
20     }
21 
22     //使用client對象實例化consumer
23     consumer,err := client.Subscribe(pulsar.ConsumerOptions{
24         Topic:"my-topic",
25         SubscriptionName:"sub-demo",
26     })
27 
28     if err != nil {
29         log.Fatal(err)
30     }
31 
32     defer consumer.Close()
33 
34     ctx := context.Background()
35 
36     //無限循環監聽topic
37     for {
38         msg,err := consumer.Receive(ctx)
39         if err != nil {
40             log.Fatal(err)
41         } else {
42             fmt.Printf("Received message : %v",string(msg.Payload()))
43         }
44 
45         consumer.Ack(msg)
46         
47     }
48 
49 }

這兩個go文件分別處於兩個項目中,其項目結構分別如下:

然后,分別編譯這兩個go項目,並生成可執行文件。首先運行pulsar-consumer,打開消費者程序,此時該消費者程序處於監聽消息狀態,如下:

然后,運行pulsar-producer,打開生產者程序,如下:

該生產者程序發送完一條消息之后即運行結束並退出。
此時,再回到消費者程序運行界面,就可以看到消費者這邊已經接收到了生產者發送的那條消息:

到此,Go語言版本的最簡單的producer和consumer代碼就完成了。

0x05 參考鏈接


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM