ONS發布訂閱消息


ONS, 全名Open Notification Service, 是阿里基於開源消息中間件RocketMQ的一個雲產品。

首先,要申請阿里賬號等。本地也可以申請阿里雲賬號自己調試。此處為公司擁有阿里雲服務,與運維op申請ons測試可用的服務。

申請ONS需要的創建topic,PID,CID,access_key,secret_key,要給產生access_key的賬戶授權發布訂閱功能權限,具體請看其他配置阿里雲賬戶權限的文章。

提供了這些以后,可以在官網查看ons快速入門文檔創建簡單的demo測試類調試生產和消費

 

使用ONS API發送訂閱消息的示例

首先,pom引用ons-client

        <!-- RocketMq ons-client-->
        <dependency>
            <groupId>com.aliyun.openservices</groupId>
            <artifactId>ons-client</artifactId>
            <version>1.7.0.Final</version>
        </dependency>

第二步,配置消費者:

  • 創建一個常量類存放 
/**
 * 公共參數配置
 *
 */
public interface MqConfigParams {
 
    //測試ons配置文件
    public static final String TOPIC = "你申請下來的topic";
    public static final String TAG = "*";
    public static final String PRODUCER_ID = "你申請下來的pid";
    public static final String CONSUMER_ID = "你申請下來的cid";
    public static final String ACCESS_KEY = "你申請下來的accesskey";
    public static final String SECRET_KEY = "你申請下來的secretkey";
    public static final String ONS_ADDR = "http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet";
 
}
  • 創建一個生產者類,此處填入配置文件中的常量類,然后調用getproduce方法時啟動生產者 producer.start();
import com.aliyun.openservices.ons.api.*;
import java.util.Date;
import java.util.Properties;

public class ONSUtil {
 
    /**
     * 獲取消息的 Producer
     *
     * @return Producer
     */
    public static Producer getProducer() {
        Properties properties = new Properties();
        properties.put(PropertyKeyConst.ProducerId, MqConfigParams.PRODUCER_ID);
        properties.put(PropertyKeyConst.AccessKey, MqConfigParams.ACCESS_KEY);
        properties.put(PropertyKeyConst.SecretKey, MqConfigParams.SECRET_KEY);
        Producer producer = ONSFactory.createProducer(properties);
 
        // 在發送消息前,必須調用start方法來啟動Producer,只需調用一次即可。
        producer.start();
        return producer;
    }
} 
  • 封裝一個發送消息的方法。
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.SendResult;
 
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
 
@Service
public class OnsProducer {
    private final static Logger logger = LoggerFactory.getLogger(SeoLogController.class);
 
    public void sendMessage(String topic, String tag, String msgStr) {
        StringBuilder logsb = new StringBuilder("send OnsMQ Msg:");
        Message msg = getInstance(topic, tag, msgStr);
        String messageId = "";
        Producer producer = ONSUtil.getProducer(); //你申請的producerId
        SendResult sendResult = producer.send(msg);
        messageId = sendResult.getMessageId();
        if (messageId != null && !messageId.equals("")) {
            logsb.append("[OnsProducer] : " + "{ messageId : " + messageId + " , msgStr :" + msgStr + "}");
        } else {
            logsb.append("[OnsProducer] fail: " + "{ " + " msgStr :" + msgStr + "}");
        }
        logger.debug(logsb.toString());
    }
 
    private static Message getInstance(String topic, String tag, String body) {
        if (body.equals("") || body == null)
            body = "";
        Message msg = new Message(topic, tag, body.getBytes());
        return msg;
    }
}  

這樣就配置好了生產者。

 

調用生產者

可以寫一個test類用main方法調試,示例這里是直接寫main方法執行上面封裝的內容直接執行看查看日志看是否成功

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.SendResult;
import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class TestSendMessage { private final static Logger logger = LoggerFactory.getLogger(TestSendMessage.class); public static void main(String[] args) { StringBuilder logsb = new StringBuilder("send OnsMQ Msg:"); Producer producer = ONSUtil.getProducer(); //你申請的producerId Message msg = new Message(MqConfigParams.TOPIC, //你申請的TopicName "*","這是消息2".getBytes()); SendResult sendResult = producer.send(msg); String messageId = ""; messageId = sendResult.getMessageId(); if (messageId != null || !messageId.equals("")) { logsb.append("[OnsProducer] : " + "{ messageId : " + messageId + " , msgStr :" + "這是消息2" + "}"); } else { logsb.append("[OnsProducer] fail: " + "{ " + " msgStr :" + "這是消息2" + "}"); } logger.debug(logsb.toString()); } }
    @RequestMapping(value = "/testmqProducer", method = RequestMethod.GET)
    public void testmq() {
         OnsProducer onsProducer=new OnsProducer();
         String msg = "testmq";
         onsProducer.sendMessage(MqConfigParams.TOPIC,"*",msg);
    }

 

接下來寫消費者示例:

  • 創建監聽類實現MessageListener此處是處理業務邏輯因為此處可以得到消費的內容
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
 
import java.util.Date;
 
public class MyMessageListener implements MessageListener {
    @Override
    public Action consume(Message message, ConsumeContext consumeContext) {
        System.out.println("Receive @" + new Date() + ": " + message);
        //此處可以寫具體業務邏輯,body是具體發送的內容
        String body = new String(message.getBody());
        System.out.println("msgBody is : " + body);
        return Action.CommitMessage;
    }
} 
  • 創建消費者啟動類和方法
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
 
import java.util.Date;
import java.util.Properties;
 
public class MyMessageConsumer {
    /**
     * 訂閱消息
     */
    public void subscribe() {
        Properties properties = new Properties();
        properties.put(PropertyKeyConst.ConsumerId, MqConfigParams.CONSUMER_ID);
        properties.put(PropertyKeyConst.AccessKey, MqConfigParams.ACCESS_KEY);
        properties.put(PropertyKeyConst.SecretKey, MqConfigParams.SECRET_KEY);
        Consumer consumer = ONSFactory.createConsumer(properties);
        consumer.subscribe(MqConfigParams.TOPIC, "*", new MyMessageListener());//此處可以寫父類messageListener但是必須實現方法這里就是用到了上面寫的myMessageListener
        consumer.start();
        System.out.println(MqConfigParams.CONSUMER_ID + " is running @" + new Date());
    }
}
  • 在Application啟動類中加入消費者啟動調用方法 
@SpringBootApplication
@MapperScan("com.ons.test")
public class Application {
    public static void main(String[] args) {
        // 啟動嵌入式的 Tomcat 並初始化 Spring 環境及其各 Spring 組件
        SpringApplication.run(Application.class,args);
        new MyMessageConsumer().subscribe(); // 系統啟動的時候啟動訂閱
    }}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM