springboot項目整合nsq,消費者直連nsqd


    項目需要用到nsq,並且是單點的,網上看到的springboot整合nsq都是先連接lookup后,從lookup獲取nsqd的連接信息。由於本項目用到了docker,映射的端口都不是原始端口了。而lookup分發的連接nsqd還是默認的4150端口。所以是連不通的。這時候需要直連nsqd,撇開lookup。

    查看官網  https://nsq.io/clients/client_libraries.html

 

    由於要用到認證,所以只能選擇   nsq-j   。翻看點進去的文檔。有個名字是直連消費者的類:  DirectSubscriber  

    看api以及父類的api,感覺可用。

  加入依賴包:

  

 <dependency>
            <groupId>com.sproutsocial</groupId>
            <artifactId>nsq-j</artifactId>
            <version>1.0</version>
        </dependency>

  

import com.sproutsocial.nsq.DirectSubscriber;
import com.sproutsocial.nsq.MessageDataHandler;
import com.sproutsocial.nsq.Subscriber;
import com.tslsmart.big.base.util.DateUtils;
import com.tslsmart.big.device.constance.NsqConstance;
import com.tslsmart.big.device.dao.entity.WatchManTaskLog;
import com.tslsmart.big.device.dao.mapper.WatchManTaskLogMapper;
import com.tslsmart.big.device.protobuffer.response.Response;
import com.tslsmart.sz.nsq.properties.NSQCloudProperties;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;

import java.util.Objects;

@Component
@Slf4j
public class WatchManNsqConsume implements ApplicationRunner {

    @Autowired
    private NSQCloudProperties nsqCloudProperties;

    @Autowired
    private TaskLogMapper taskLogMapper;

    @Override
    public void run(ApplicationArguments applicationArguments) throws InterruptedException {
        Subscriber directSubscriber = null;
        while(Objects.isNull(directSubscriber) || directSubscriber.getConnectionCount() == 0){
            Thread.sleep(500);
            log.info("==========="+DateUtils.getNowDateTime()+"啟動消費者=======================");
            String serverAddrs = nsqCloudProperties.getServerAddrs();
            if(StringUtils.isNotEmpty(serverAddrs)){
                directSubscriber = new DirectSubscriber(2, serverAddrs.split(","));
                directSubscriber.subscribe(NsqConstance.TSL_DATA_RES, "server_data_receive", new MessageDataHandler() {
                    @Override
                    public void accept(byte[] bytes) {
                        log.info("==========="+DateUtils.getNowDateTime()+"同步空間數據給平台==接收消息====start");
                        Response.Res_Detail builder = null;
                        try {
                            builder = Response.Res_Detail.parseFrom(bytes);
                            int status = builder.getStatus();
                            TaskLog taskLog = new TaskLog();
                            taskLog.setTasklogid(builder.getSeqNo());
                            if(1 == status){
                                taskLog.setHandleresult(true);
                            }else{
                                taskLog.setErrmsg(builder.getErrMsg());
                            }
                            TaskLogMapper.updateByPrimaryKey(taskLog);
                            log.info("==========="+DateUtils.getNowDateTime()+"同步空間數據給平台==接收消息====end=={}",
                                    builder.getSeqNo());
                        } catch (Exception e) {
                            log.error("==========="+DateUtils.getNowDateTime()+"同步空間數據給平台出錯======"+e.getMessage());
                            log.error("同步報錯:",e);
                        }
                    }
                });
                //directSubscriber.stop();
                log.error("==========="+DateUtils.getNowDateTime()+" directSubscriber.stop()======:"+directSubscriber
                        .getConnectionCount());
            }
        }


    }
}

  

測試可用。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM