針對公司業務邏輯,向阿里雲MQ發送指定數據,消費端根據數據來做具體的業務,分兩個項目,一個生產端(Producer)、一個消費端(Consumer)
生產端通過定時任務執行sql向阿里雲MQ發送數據,消費端消費指定Topic上的數據
1:定時任務列表:
2:生產端表結構:
aliasName:定時任務別名;
cronExpression:定時任務輪詢規則;
jobGroup:定時任務分組;
jobName:定時任務名稱;
jobTrigger:定時任務觸發器;
packageUrl:定時任務掃描具體封裝類;
excuteSql:掃描類中執行的獲取數據的腳本;
lastPramaryKey:最后一次獲取數據時最大的主鍵;
topic:阿里雲MQ的topic;
producerId:生產端的Id;
accessKey、securityKey:賬號跟秘鑰
dataBaseType:操作數據庫類型(公司數據庫類型比較多,執行腳本時,需要根據類型來指定具體的Service)
3:Java端核心代碼,定時任務掃描如下配置的任務類來向阿里雲MQ發送數據
public class SendPrimaryKeyListToMqTask implements Job{ private final Logger logger = LoggerFactory.getLogger(SendPrimaryKeyListToMqTask.class); public void execute(JobExecutionContext context) throws JobExecutionException{ JobDataMap data = context.getJobDetail().getJobDataMap(); ScheduleJob scheduleJob = (ScheduleJob)data.get("jobParam"); //最后一次獲取數據時最大的主鍵 int lastPramaryKey = scheduleJob.getLastPramaryKey(); //執行sql String excuteSql = scheduleJob.getExcuteSql(); excuteSql = excuteSql.replace("lastPramaryKey", String.valueOf(lastPramaryKey)); //操作數據庫類型(數據庫配置) int dataBaseType = scheduleJob.getDataBaseType(); //從游戲庫獲取數據 LinkedList<ExcuteResultData> resultData = new LinkedList<ExcuteResultData>(); if( dataBaseType == 1 ){ GameService gameService = (GameService)SpringBeanFactory.getBean(GameService.class); resultData = gameService.getExcuteResultData(excuteSql); //從網站庫獲取數據 }else if( dataBaseType == 2 ){ SiteService siteService = (SiteService)SpringBeanFactory.getBean(SiteService.class); resultData = siteService.getExcuteResultData(excuteSql); } if ( resultData.size() > 0 ){ scheduleJob.setPrimaryKeyList(resultData); QuartzService quartzService = (QuartzService)SpringBeanFactory.getBean(QuartzService.class); //將數據集中最大的主鍵更新 scheduleJob.setLastPramaryKey(resultData.getLast().getPrimaryKey()); quartzService.updateLastPramaryKey(scheduleJob); String topic = scheduleJob.getTopic(); String producerId = scheduleJob.getProducerId(); String ak = scheduleJob.getAccessKey(); String sk = scheduleJob.getSecurityKey(); //添加日志 ScheduleJobLog scjLog = new ScheduleJobLog(); scjLog.setDataSize(resultData.size()); scjLog.setJobName(scheduleJob.getJobName()); scjLog.setTopic(topic); int scjLogId = quartzService.addMqScheduleJobLog(scjLog); //消費端根據此日志主鍵更新日志狀態 scheduleJob.setScjLogId(scjLogId); Properties properties = new Properties(); properties.put("ProducerId", producerId); properties.put("AccessKey", ak); properties.put("SecretKey", sk); Producer producer = ONSFactory.createProducer(properties); producer.start(); Message msg = new Message(topic, "PRIMARY_KEY_" + String.valueOf(scjLogId), ObjectsTranscoder.serialize(scheduleJob)); msg.setKey("PRIMARY_KEY_" + String.valueOf(scjLogId)); SendResult sendResult = producer.send(msg); if ( ( sendResult != null ) && ( sendResult.getMessageId() != null ) ){ scjLog.setMessageId(sendResult.getMessageId()); scjLog.setStatus(2); quartzService.updateMqScheduleJobLog(scjLog); } producer.shutdown(); logger.debug("=====>任務名稱:" + scheduleJob.getJobName()); logger.debug("=====>發送條數:" + resultData.size()); logger.debug("=====>發送主鍵內容:" + resultData.toString()); } } }
4:消費端表結構:
5:消費端Java核心代碼(通過監聽器來做):
import java.util.List; import java.util.Properties; import javax.servlet.ServletContextEvent; import javax.servlet.ServletContextListener; import org.springframework.web.context.WebApplicationContext; import org.springframework.web.context.support.WebApplicationContextUtils; import com.aliyun.openservices.ons.api.Action; import com.aliyun.openservices.ons.api.ConsumeContext; import com.aliyun.openservices.ons.api.Consumer; import com.aliyun.openservices.ons.api.Message; import com.aliyun.openservices.ons.api.MessageListener; import com.aliyun.openservices.ons.api.ONSFactory; import com.aliyun.openservices.ons.api.PropertyKeyConst; import com.odao.common.utils.ObjectsTranscoder; import com.odao.entity.ScheduleJob; import com.odao.entity.ScheduleJobLog; import com.odao.service.consumer.ConsumerService; import com.odao.service.message.MessageService; /** * 阿里雲游戲、網站 主鍵數據集消費監聽器 */ public class ConsumePrimaryKeyFromMqListener implements ServletContextListener { @Override public void contextInitialized(ServletContextEvent sce) { WebApplicationContext appctx = WebApplicationContextUtils.getWebApplicationContext(sce.getServletContext()); MessageService messageService = (MessageService) appctx.getBean(MessageService.class); List<ScheduleJob> consumeList = messageService.getScheduleJobList(); for(final ScheduleJob sjc : consumeList){ String topic = sjc.getTopic(); String consumerId= sjc.getConsumerId(); String ak = sjc.getAccessKey(); String sk = sjc.getSecurityKey(); Properties properties = new Properties(); properties.put(PropertyKeyConst.ConsumerId,consumerId); properties.put(PropertyKeyConst.AccessKey,ak); properties.put(PropertyKeyConst.SecretKey,sk); //properties.put(PropertyKeyConst.ONSAddr,"http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal"); Consumer consumer = ONSFactory.createConsumer(properties); consumer.subscribe(topic, "*", new MessageListener() { @Override public Action consume(Message message, ConsumeContext context) { ScheduleJob scheduleJob = (ScheduleJob) ObjectsTranscoder.deserialize(message.getBody()); if( scheduleJob !=null ){ //更新消息狀態為3:消費消息成功 ScheduleJobLog scjLog = new ScheduleJobLog(); scjLog.setStatus(3); scjLog.setMqScheduleJobLogId(scheduleJob.getScjLogId()); messageService.updateMqScheduleJobLog(scjLog); try { ConsumerService consumerService = (ConsumerService) Class.forName(sjc.getImplementClass()).newInstance(); boolean isSuccess = consumerService.consume(scheduleJob.getPrimaryKeyList()); if(isSuccess){ //更新消息狀態為4:業務邏輯處理成功 scjLog.setStatus(4); messageService.updateMqScheduleJobLog(scjLog); } } catch (InstantiationException e) { e.printStackTrace(); } catch (IllegalAccessException e) { e.printStackTrace(); } catch (ClassNotFoundException e) { e.printStackTrace(); } } return Action.CommitMessage; } }); consumer.start(); } } @Override public void contextDestroyed(ServletContextEvent sce) { } }