是什么
Pulsar 是一個用於服務器到服務器的消息系統,具有多租戶、高性能等優勢。詳見
安裝
本文主練習怎么用,本地搭建了一個單機版,無非就是wget、tar、start這些命令,詳見
Java客戶端
1.引入GAV
<!-- https://mvnrepository.com/artifact/org.apache.pulsar/pulsar-client -->
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<version>2.8.0</version>
</dependency>
2.創建配置項
用於連接Pulsar等配置
-
Yml配置
pulsar: url: 10.20.30.228:6650 # url: 10.20.30.228:6650,10.20.30.228:6651 #集群配置
-
新增配置類
package com.project.pulsar.conf; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class PulsarConf { @Value("${pulsar.url}") String url; @Bean public PulsarClient pulsarFactory(){ PulsarClient client = null; try { client = PulsarClient.builder() .serviceUrl("pulsar://"+url) .build(); } catch (PulsarClientException e) { } return client; } }
3.驗證測試
通過簡單生產-消費測試配置是否正常
-
創建BaseController
注意,subscriptionName要保證唯一
package com.project.pulsar.base; import com.project.pulsar.conf.PulsarConf; import org.apache.pulsar.client.api.*; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import java.util.HashMap; import java.util.Map; @RestController public class BaseController { @Autowired PulsarConf pulsarConf; /** * 生產消息 * @param msg * @throws PulsarClientException */ @GetMapping("/base/sendMsg") public MessageId sendMsg(String msg) throws PulsarClientException { PulsarClient pulsarFactory = pulsarConf.pulsarFactory(); Producer<byte[]> producer1 = pulsarFactory.newProducer() .topic("my-topic") .create(); // 然后你就可以發送消息到指定的broker 和topic上: return producer1.send(msg.getBytes()); } /** * 手動執行獲取消息 * @throws PulsarClientException */ @GetMapping("/base/comsumer") public void comsumerByArtificial() throws PulsarClientException { PulsarClient pulsarFactory = pulsarConf.pulsarFactory(); Consumer<byte[]> consumer = pulsarFactory.newConsumer() .topic("my-topic") .subscriptionName("my-subscription") .subscribe(); Message<byte[]> receive = consumer.receive(); System.out.println(new String(receive.getData())); consumer.acknowledge(receive);//確認消息被消費 consumer.close(); } /** * 自動監聽消費消息 * @throws PulsarClientException */ @Bean public void comsumerByListener() throws PulsarClientException { MessageListener myMessageListener = (consumer, msg) -> { try { System.out.println("Message received: " + new String(msg.getData())); consumer.acknowledge(msg); } catch (Exception e) { consumer.negativeAcknowledge(msg); } }; PulsarClient pulsarFactory = pulsarConf.pulsarFactory(); pulsarFactory.newConsumer() .topic("my-topic") .subscriptionName("my-subscriptionByListener") .messageListener(myMessageListener) .subscribe(); } }
-
生產消息
[127.0.0.1:9999/base/sendMsg?msg=Hello RB](http://127.0.0.1:9999/base/sendMsg?msg=Hello RB)
-
消費消息
- 在生產后,如果采用監聽模式,會自動消費
- 在生產后,如果采用手動模式,執行127.0.0.1:9999/base/comsumer會被消費,如隊列中無消費,則會阻塞等待
其他及代碼下載
- topic不用顯式創建,當消息發送或消費者建立連接時,如未創建會自動創建
- 代碼見此Base包下