阿里RocketMq(TCP模式)


針對公司業務邏輯,向阿里雲MQ發送指定數據,消費端根據數據來做具體的業務,分兩個項目,一個生產端(Producer)、一個消費端(Consumer)

生產端通過定時任務執行sql向阿里雲MQ發送數據,消費端消費指定Topic上的數據

1:定時任務列表:

2:生產端表結構:

aliasName:定時任務別名;

cronExpression:定時任務輪詢規則;

jobGroup:定時任務分組;

jobName:定時任務名稱;

jobTrigger:定時任務觸發器;

packageUrl:定時任務掃描具體封裝類;

excuteSql:掃描類中執行的獲取數據的腳本;

lastPramaryKey:最后一次獲取數據時最大的主鍵;

topic:阿里雲MQ的topic;

producerId:生產端的Id;

accessKey、securityKey:賬號跟秘鑰

dataBaseType:操作數據庫類型(公司數據庫類型比較多,執行腳本時,需要根據類型來指定具體的Service)

3:Java端核心代碼,定時任務掃描如下配置的任務類來向阿里雲MQ發送數據

public class SendPrimaryKeyListToMqTask implements Job{
    
    private final Logger logger = LoggerFactory.getLogger(SendPrimaryKeyListToMqTask.class);
    
    public void execute(JobExecutionContext context) throws JobExecutionException{
        JobDataMap data = context.getJobDetail().getJobDataMap();
        ScheduleJob scheduleJob = (ScheduleJob)data.get("jobParam");
    
        //最后一次獲取數據時最大的主鍵
        int lastPramaryKey = scheduleJob.getLastPramaryKey();
        
        //執行sql
        String excuteSql = scheduleJob.getExcuteSql();
        excuteSql = excuteSql.replace("lastPramaryKey", String.valueOf(lastPramaryKey));
        
        //操作數據庫類型(數據庫配置)
        int dataBaseType = scheduleJob.getDataBaseType();
        
        //從游戲庫獲取數據
        LinkedList<ExcuteResultData> resultData = new LinkedList<ExcuteResultData>();
        if( dataBaseType == 1 ){
            GameService gameService = (GameService)SpringBeanFactory.getBean(GameService.class);
            resultData = gameService.getExcuteResultData(excuteSql);
        //從網站庫獲取數據
        }else if( dataBaseType == 2 ){
             SiteService siteService = (SiteService)SpringBeanFactory.getBean(SiteService.class);
             resultData = siteService.getExcuteResultData(excuteSql);
        }
    
        if ( resultData.size() > 0 ){
            scheduleJob.setPrimaryKeyList(resultData);
            QuartzService quartzService = (QuartzService)SpringBeanFactory.getBean(QuartzService.class);
            //將數據集中最大的主鍵更新
            scheduleJob.setLastPramaryKey(resultData.getLast().getPrimaryKey());
            quartzService.updateLastPramaryKey(scheduleJob);
    
            String topic = scheduleJob.getTopic();
            String producerId = scheduleJob.getProducerId();
            String ak = scheduleJob.getAccessKey();
            String sk = scheduleJob.getSecurityKey();
            
            //添加日志
            ScheduleJobLog scjLog = new ScheduleJobLog();
            scjLog.setDataSize(resultData.size());
            scjLog.setJobName(scheduleJob.getJobName());
            scjLog.setTopic(topic);
            int scjLogId = quartzService.addMqScheduleJobLog(scjLog);
            //消費端根據此日志主鍵更新日志狀態
            scheduleJob.setScjLogId(scjLogId);
            
            Properties properties = new Properties();
            properties.put("ProducerId", producerId);
            properties.put("AccessKey", ak);
            properties.put("SecretKey", sk);
            Producer producer = ONSFactory.createProducer(properties);
            producer.start();
            
            Message msg = new Message(topic, "PRIMARY_KEY_" + String.valueOf(scjLogId), ObjectsTranscoder.serialize(scheduleJob));
            msg.setKey("PRIMARY_KEY_" + String.valueOf(scjLogId));
            SendResult sendResult = producer.send(msg);
            if ( ( sendResult != null ) && ( sendResult.getMessageId() != null ) ){
                scjLog.setMessageId(sendResult.getMessageId());
                scjLog.setStatus(2);
                quartzService.updateMqScheduleJobLog(scjLog);
            }
            
            producer.shutdown();
    
            logger.debug("=====>任務名稱:" + scheduleJob.getJobName());
            logger.debug("=====>發送條數:" + resultData.size());
            logger.debug("=====>發送主鍵內容:" + resultData.toString());

        }
    }
}

4:消費端表結構:

5:消費端Java核心代碼(通過監聽器來做):

import java.util.List;
import java.util.Properties;
import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;
import org.springframework.web.context.WebApplicationContext;
import org.springframework.web.context.support.WebApplicationContextUtils;
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.odao.common.utils.ObjectsTranscoder;
import com.odao.entity.ScheduleJob;
import com.odao.entity.ScheduleJobLog;
import com.odao.service.consumer.ConsumerService;
import com.odao.service.message.MessageService;

/**
 * 阿里雲游戲、網站 主鍵數據集消費監聽器
 */
public class ConsumePrimaryKeyFromMqListener implements ServletContextListener {
    
    @Override
    public void contextInitialized(ServletContextEvent sce) {
        WebApplicationContext appctx = WebApplicationContextUtils.getWebApplicationContext(sce.getServletContext());
        MessageService messageService = (MessageService) appctx.getBean(MessageService.class);
        List<ScheduleJob> consumeList = messageService.getScheduleJobList();
        for(final ScheduleJob sjc : consumeList){
            
            String topic = sjc.getTopic();
            String consumerId= sjc.getConsumerId();
            String ak = sjc.getAccessKey();
            String sk = sjc.getSecurityKey();
            
            Properties properties = new Properties();
            properties.put(PropertyKeyConst.ConsumerId,consumerId);
            properties.put(PropertyKeyConst.AccessKey,ak);
            properties.put(PropertyKeyConst.SecretKey,sk);
            //properties.put(PropertyKeyConst.ONSAddr,"http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal");
            
            Consumer consumer = ONSFactory.createConsumer(properties);
            
            consumer.subscribe(topic, "*", new MessageListener() {
                @Override
                public Action consume(Message message, ConsumeContext context) {
                    ScheduleJob scheduleJob = (ScheduleJob) ObjectsTranscoder.deserialize(message.getBody());
                    if( scheduleJob !=null ){
                        //更新消息狀態為3:消費消息成功
                        ScheduleJobLog scjLog = new ScheduleJobLog();
                        scjLog.setStatus(3);
                        scjLog.setMqScheduleJobLogId(scheduleJob.getScjLogId());
                        messageService.updateMqScheduleJobLog(scjLog);
                        try {
                            ConsumerService consumerService = (ConsumerService) Class.forName(sjc.getImplementClass()).newInstance();
                            boolean isSuccess = consumerService.consume(scheduleJob.getPrimaryKeyList());
                            if(isSuccess){
                                //更新消息狀態為4:業務邏輯處理成功
                                scjLog.setStatus(4);
                                messageService.updateMqScheduleJobLog(scjLog);
                            }
                        } catch (InstantiationException e) {
                            e.printStackTrace();
                        } catch (IllegalAccessException e) {
                            e.printStackTrace();
                        } catch (ClassNotFoundException e) {
                            e.printStackTrace();
                        }
                    }
                    return Action.CommitMessage;
                }
            });
            
            consumer.start();
        }
    }
    
    @Override
    public void contextDestroyed(ServletContextEvent sce) {
        
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM