1.1需求
數據庫300 萬條用戶數據 ,遍歷獲取所有用戶, 各種組合關聯, 獲取到一個新的json ,存到redis 上。
1.2 難點
數據庫比較多, 不可能單線程查詢所有的數據到內存。
1.3解決辦法
多線程讀取, 生產者 每次獲取200 條數據, 消費者去消費。(這里 主要是根據MySQL分頁去獲取下一個200 條數據)
1.4 代碼
1.4.1 調用方法
/** * 線程啟動 */ public void update() {
//redis操作類 HashRedisUtil redisUtil= HashRedisUtil.getInstance(); //生產者消費者 ProducerConsumer pc = new ProducerConsumer(); //數據倉庫 Storage s = pc.new Storage(); ExecutorService service = Executors.newCachedThreadPool(); //一個線程進行查詢 Producer p = pc.new Producer(s,userMapper); service.submit(p); System.err.println("生產線程正在生產中。。。。。。。。。"); //是個線程進行修改 for(int i=0;i<10;i++){ System.err.println("消費線程"+i+"正在消費中。。。。。。。。。。"); service.submit(pc.new Consumer( redisUtil,userMapper,s)); } }
1.4.2 主要核心類
package com.ypp.thread; import java.math.BigDecimal; import java.util.Calendar; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import org.apache.commons.lang.StringUtils; import org.apache.log4j.Logger; import org.joda.time.LocalDateTime; import com.alibaba.fastjson.JSONObject; import com.ypp.constants.Constants; import com.ypp.mapper.UserMapper; import com.ypp.model.User; import com.ypp.model.UserAlis; import com.ypp.model.UserBaseModel; import com.ypp.model.UserVip; import com.ypp.util.HashRedisUtil; import com.ypp.util.JsonUtils; import com.ypp.util.PHPSerializer; public class ProducerConsumer { private static Logger logger = Logger.getLogger(ProducerConsumer.class);
//這個page 是核心, 全局變量, 當生產者生產一次 ,獲取200 個用戶, 會把這個page++, 下次獲取就是后一個200 條用戶了 private static Integer page = 0; //消費者
public class Consumer implements Runnable { private HashRedisUtil redisUtil; private UserMapper userMapper; private Storage s = null; public Consumer(HashRedisUtil redisUtil, UserMapper userMapper, Storage s) { super(); this.redisUtil = redisUtil; this.userMapper = userMapper; this.s = s; } public void run() { try { while (true) { User users = s.pop(); long bbb = System.currentTimeMillis(); // 獲取一個用戶的粉絲列表 並存到redis try { fansUpdate(users.getToken(), users.getUserId(), redisUtil); } catch (Exception e1) { e1.printStackTrace(); } // 獲取一個用戶的關注列表, 並存到redis try { followUpdate(users.getToken(), users.getUserId(), redisUtil); } catch (Exception e) { e.printStackTrace(); } // 獲取一個用戶的黑名單, 並存到redis try { blackUpdate(users.getToken(), users.getUserId(), redisUtil); } catch (Exception e) { e.printStackTrace(); } // 用戶基本信息 try { userbaseUpdate(users.getToken(), users.getUserId(), redisUtil); } catch (Exception e) { e.printStackTrace(); } long ccc = System.currentTimeMillis(); System.out.println("用戶:" + users.getToken() + " 全部總共耗時:" + (ccc - bbb) + "毫秒"); Thread.sleep(500); } } catch (InterruptedException e) { e.printStackTrace(); } } public List<User> getUserInfo(Integer iThread) { return userMapper.findUserInfo((iThread - 1) * 200 + 1); } /** * 用戶基本信息修改 * * @param token * @param myuserId * @param redisUtil * @throws Exception */ private void userbaseUpdate(String token, String myUserId, HashRedisUtil redisUtil) throws Exception { } /** * 更新一個用戶的黑名單(原來的token改成userID) * * @param token * @param string * @param redisUtil * @throws Exception */ private void blackUpdate(String token, String myUserId, HashRedisUtil redisUtil) throws Exception { } /** * 獲取一個用戶的關注 * * @param token * @param string * @param redisUtil * @throws Exception */ private void followUpdate(String token, String myUserId, HashRedisUtil redisUtil) throws Exception { } /** * 獲取一個用戶的粉絲列表 * * @param token * @param userId * @param redisUtil * @throws Exception */ private void fansUpdate(String token, String myUserId, HashRedisUtil redisUtil) throws Exception { } //生產者 public class Producer implements Runnable { private Storage s = null; private UserMapper mapper ; public Producer( Storage s, UserMapper mapper) { this.s = s; this.mapper = mapper; } public void run() { try { while (true) { System.err.println("當前分頁是:"+page+"****************************************"); List<User> list= mapper.findUserInfo(page); s.push(list); page++; } } catch (InterruptedException e1) { e1.printStackTrace(); } } }
//數據倉庫 public class Storage { BlockingQueue<User> queues = new LinkedBlockingQueue<User>(200); /** * 生產 * * @param p * 產品 * @throws InterruptedException */ public void push(List<User> p) throws InterruptedException { for(User user:p){ queues.put(user); } } /** * 消費 * * @return 產品 * @throws InterruptedException */ public User pop() throws InterruptedException { return queues.take(); } } }
