1.java連接pulsar服務


是什么

Pulsar 是一個用於服務器到服務器的消息系統,具有多租戶、高性能等優勢。詳見

安裝

本文主練習怎么用,本地搭建了一個單機版,無非就是wget、tar、start這些命令,詳見

Java客戶端

1.引入GAV

        <!-- https://mvnrepository.com/artifact/org.apache.pulsar/pulsar-client -->
        <dependency>
            <groupId>org.apache.pulsar</groupId>
            <artifactId>pulsar-client</artifactId>
            <version>2.8.0</version>
        </dependency>

2.創建配置項

用於連接Pulsar等配置

  • Yml配置

    pulsar:
      url: 10.20.30.228:6650
    #  url: 10.20.30.228:6650,10.20.30.228:6651 #集群配置
    
  • 新增配置類

    package com.project.pulsar.conf;
    
    import org.apache.pulsar.client.api.PulsarClient;
    import org.apache.pulsar.client.api.PulsarClientException;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class PulsarConf {
        @Value("${pulsar.url}")
        String url;
    
        @Bean
        public PulsarClient pulsarFactory(){
            PulsarClient client = null;
            try {
                client = PulsarClient.builder()
                        .serviceUrl("pulsar://"+url)
                        .build();
            } catch (PulsarClientException e) {
            }
            return client;
        }
    }
    

3.驗證測試

​ 通過簡單生產-消費測試配置是否正常

  • 創建BaseController

    注意,subscriptionName要保證唯一

    package com.project.pulsar.base;
    
    
    import com.project.pulsar.conf.PulsarConf;
    import org.apache.pulsar.client.api.*;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import java.util.HashMap;
    import java.util.Map;
    
    @RestController
    public class BaseController {
        @Autowired
        PulsarConf pulsarConf;
    
        /**
         * 生產消息
         * @param msg
         * @throws PulsarClientException
         */
        @GetMapping("/base/sendMsg")
        public MessageId sendMsg(String msg) throws PulsarClientException {
            PulsarClient pulsarFactory = pulsarConf.pulsarFactory();
    
            Producer<byte[]> producer1 = pulsarFactory.newProducer()
                    .topic("my-topic")
    
                    .create();
            // 然后你就可以發送消息到指定的broker 和topic上:
            return producer1.send(msg.getBytes());
        }
    
        /**
         * 手動執行獲取消息
         * @throws PulsarClientException
         */
        @GetMapping("/base/comsumer")
        public void comsumerByArtificial() throws PulsarClientException {
            PulsarClient pulsarFactory = pulsarConf.pulsarFactory();
            Consumer<byte[]> consumer = pulsarFactory.newConsumer()
                    .topic("my-topic")
                    .subscriptionName("my-subscription")
                    .subscribe();
            Message<byte[]> receive = consumer.receive();
            System.out.println(new String(receive.getData()));
            consumer.acknowledge(receive);//確認消息被消費
            consumer.close();
        }
    
        /**
         * 自動監聽消費消息
         * @throws PulsarClientException
         */
        @Bean
        public void comsumerByListener() throws PulsarClientException {
            MessageListener myMessageListener = (consumer, msg) -> {
                try {
                    System.out.println("Message received: " + new String(msg.getData()));
                    consumer.acknowledge(msg);
                } catch (Exception e) {
                    consumer.negativeAcknowledge(msg);
                }
            };
            PulsarClient pulsarFactory = pulsarConf.pulsarFactory();
            pulsarFactory.newConsumer()
                    .topic("my-topic")
                    .subscriptionName("my-subscriptionByListener")
                    .messageListener(myMessageListener)
                    .subscribe();
        }
    
    
    }
    
    
  • 生產消息

    [127.0.0.1:9999/base/sendMsg?msg=Hello RB](http://127.0.0.1:9999/base/sendMsg?msg=Hello RB)

  • 消費消息

    • 在生產后,如果采用監聽模式,會自動消費
    • 在生產后,如果采用手動模式,執行127.0.0.1:9999/base/comsumer會被消費,如隊列中無消費,則會阻塞等待

其他及代碼下載

  • topic不用顯式創建,當消息發送或消費者建立連接時,如未創建會自動創建
  • 代碼見此Base包下


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM