同步數據庫數據到ES中代碼


多節點部署保證HA,分布式鎖代碼

復制代碼
 
public class DistributedLock implements Watcher,Runnable{
    
    private static final Logger logger = LoggerFactory.getLogger(DistributedLock.class);
    private int threadId;
    private ZKConnector zkClient;
    private String selfPath;
    private String waitPath;
    private String LOG_PREFIX_OF_THREAD;
    private AbstractApplicationContext ctx;
    private static boolean hascreated = false;
    
    //確保連接zookeeper成功
    private CountDownLatch connectedSemaphore = new CountDownLatch(1);
    //確保每個進程運行結束
    private static final CountDownLatch threadSemaphore = new CountDownLatch(Constant.THREAD_NUM);
    
    
    public ZKConnector getZkClient() {
        return zkClient;
    }
    public void setZkClient(ZKConnector zkClient) {
        this.zkClient = zkClient;
    }
    
    public DistributedLock(int id,AbstractApplicationContext context,ZKConnector zkClient){
        this.threadId = id;
        this.zkClient = zkClient;
        LOG_PREFIX_OF_THREAD = Thread.currentThread().getName().concat("_").concat(String.valueOf(Thread.currentThread().getId()));
        
        try{
            zkClient.createConnection(Constant.ZKSERVER, Constant.SESSION_TIMEOUT);
            //GROUP_PATH 不存在的話,由一個線程創建即可
            synchronized (threadSemaphore) {
                if(!zkClient.exist(Constant.GROUP_PATH)){
                    zkClient.createPersistNode(Constant.GROUP_PATH, "該節點由線程"+threadId+"創建");
                }
            }
            ctx = context;
            }catch(Exception e){
                e.printStackTrace();
            }
    }
    
    
    @Override
    public void run() {
        getLock();
    }
    
    @Override
    public void process(WatchedEvent event) {
        if(event ==null){
            return;
        }
        if(KeeperState.SyncConnected ==event.getState()){
            if(EventType.None == event.getType()){
                connectedSemaphore.countDown();
            }else if(event.getType()==EventType.NodeDeleted && event.getPath().equals(waitPath)){
                if(checkMinPath()){
                    getLockSuccess();    
                }
            }
            
        }
    }
    
    /**
     * 獲取鎖邏輯:
     * 首先是上來先在zookeeper上注冊一把屬於自己的鎖,然后修改狀態為已創建
     * 第二步,檢查自己是否是最小id的鎖,若是則獲取鎖,不是則繼續等待
     */
    private void getLock(){
        if(!hascreated){
            selfPath = this.getZkClient().createEsquentialNode(Constant.SUB_PATH, "");
            hascreated = true;
        }
        if(checkMinPath()){
            getLockSuccess();
        }else{
            Executor.run(this, 1, 1,TimeUnit.SECONDS);
        }
        
    }
    
    /**
     * 檢查自己是不是最小路徑
     * @return
     */
    public boolean checkMinPath(){
        List<String> subNodes = this.getZkClient().getChildren(Constant.GROUP_PATH);
        Collections.sort(subNodes);
        //查找"/syncLocks"后面的路徑
        int index = subNodes.indexOf(selfPath.substring(Constant.GROUP_PATH.length()+1));
        switch(index){
            case -1:{
                return false;
            }
            case 0:{
                return true;
            }
            default:{
                this.waitPath = Constant.GROUP_PATH+"/"+subNodes.get(index-1);
                //Logger.info("waitPath: "+waitPath);
                this.getZkClient().readData(waitPath);
                if(!this.getZkClient().exist(waitPath)){
                    return checkMinPath();
                }
            }
        }
        return false;
    }
    
    /**
     * 獲取鎖成功
     */
    public void getLockSuccess(){
        if(!this.getZkClient().exist(selfPath)){
            logger.error(LOG_PREFIX_OF_THREAD+"本節點已不存在.");
            return;
        }
        logger.info(LOG_PREFIX_OF_THREAD + "獲取鎖成功,進行同步工作!");
        
        try{
            new Worker(ctx).doWork();
        }catch(Exception ex){
            logger.info(ex.getMessage());
            Executor.run(this, 1, 1, TimeUnit.SECONDS);
            return;
        }
        
        logger.info(LOG_PREFIX_OF_THREAD+"刪除本節點:"+selfPath);
        this.getZkClient().deleteNode(selfPath);
        this.getZkClient().releaseConnection();
        threadSemaphore.countDown();
    }
    
    
}
復制代碼

 

執行同步工作代碼

復制代碼
public class Worker {
    
    private static final Logger logger = LoggerFactory.getLogger(Worker.class);
    private static JdbcTemplate jdbcTemplate;
    private final ObjectMapper mapper = new ObjectMapper();
    private ZKConnector zkClient =null;
    private TransportClient client =null;
    private Timestamp currentTimestamp = null;
    private Timestamp previousTimestamp = null;    
    private static final String oggSql = "select * from t_order t0 left join t_order_attachedinfo t1 on t0.order_id = t1.order_id where ";
    
    private String sql;
    
    public String getSql() {
        return sql;
    }

    public void setSql(String sql) {
        this.sql = sql;
    }
    
    private TransportClient getClient() {
        Settings settings = Settings.settingsBuilder().put("cluster.name", Constant.CLUSTER).build();
        TransportClient client = TransportClient.builder().settings(settings).build();
        try {
            client.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(Constant.ESHOST), Constant.ESPORT));
        } catch (UnknownHostException e) {
            e.printStackTrace();
        }
        return client;
    }
    
    public Worker(AbstractApplicationContext ctx){
        //初始化Oracle連接
        jdbcTemplate = (JdbcTemplate) ctx.getBean("jdbcTemplate");
        client = getClient();
        zkClient = new ZKConnector();
        zkClient.createConnection(Constant.ZKSERVER, Constant.SESSION_TIMEOUT);
        
        //初始化zookeeper鎖,由於zookeeper不能聯級創建
        if(!zkClient.exist(Constant.ZK_PATH)){
            zkClient.createPersistNode(Constant.ZK_PATH,"");
        }
        
        /**
         * 獲取zookeeper的最后同步時間
         */
        if(currentTimestamp == null){
            String zkTimestamp = zkClient.readData(Constant.NODE_PATH);
            if(zkTimestamp != null && !zkTimestamp.equals(""))
            {
                try
                {
                    currentTimestamp = Timestamp.valueOf(zkTimestamp);
                    logger.info("獲取zookeeper最后同步時間: "+currentTimestamp);
                }catch(Exception e){
                    zkClient.deleteNode(Constant.NODE_PATH);
                }
            }
        }
    }
    
    /**
     * 同步work的邏輯:
     *     將Oracle里面的規則表同步到緩存當中
     *     首先是訪問Oracle里面數據,通過訪問最小鎖里面的同步時間戳,查詢出大於同步時間戳的數據
     *  如果在zookeeper中獲取的時間戳為空,則查詢條件增加時間戳,寫入存儲框架
     *  寫入成功之后,將最后一條記錄的同步時間戳寫到zookeeper集群中
     *  若寫入失敗,和zookeeper握手失敗,會話鎖消失
     *  然后導入ElasticSearch中
     */
    public void doWork(){
        logger.info("start ...");
        //一直進行同步工作
        while(true){
            String sqlwhere = "";
            //根據時間戳獲取Mycat中規則表數據
            String sql = "";
            //若最后一次同步時間為空,則按最后更新時間排序,取最小的時間作為當前時間戳
            if(currentTimestamp != null){
                sql = "select order_id,timestamp from t_order_changes  where rownum <= 10 and timestamp > to_timestamp('" + currentTimestamp.toString() + "','yyyy-mm-dd hh24:mi:ss.ff6')";
            }else{
                sql = "select order_id,timestamp from t_order_changes  where rownum <= 10 order by timestamp";
            }
            
            //查詢該時間段的訂單id
            List<String> ids = new ArrayList<String>();
            
            //升序會將最后一次的時間也就是最大的時間作為當前的currentTimeStamp
            ids = jdbcTemplate.query(sql, new Object[] {}, new RowMapper<String>() 
            {
                public String mapRow(ResultSet result, int rowNum) throws SQLException {
                    currentTimestamp = result.getTimestamp("timestamp");
                    return result.getString("order_id");
                }
            });        
            
            if(ids.size() ==0){
                continue;
            }
            
            int i =0;
            List<String> checkIds = new ArrayList<String>();
            for (String id : ids) {
                //若存在更新的id則跳過
                if (checkIds.contains(id)) {
                    continue;
                }
                if (i == 0) {
                    sqlwhere = sqlwhere.concat(" t0.order_id = '" + id + "'");
                } else {
                    sqlwhere = sqlwhere.concat(" or t0.order_id = '" + id + "'");
                }
                checkIds.add(id);
                i++;
            }
            
            System.out.println(oggSql.concat(sqlwhere));
            //objs 即是Oracle里面查詢出來需要同步的數據
            List<JSONObject> objs = jdbcTemplate.query(oggSql.concat(sqlwhere), new Object[] {}, new RowMapper<JSONObject>() 
            {
                SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");    
                public JSONObject mapRow(ResultSet result, int rowNum) throws SQLException {
                    int c = result.getMetaData().getColumnCount();
                    JSONObject obj = new JSONObject();
                    
                    for(int t =1 ;t <= c;t++)
                    {
                        if(result.getObject(t) == null)
                        {
                            continue;
                        }
                        if(result.getMetaData().getColumnType(t) == Types.DATE)
                        {
                            obj.put(result.getMetaData().getColumnLabel(t).toLowerCase(), result.getDate(t));
                        }else if(result.getMetaData().getColumnType(t) == Types.TIMESTAMP)
                        {
                            Date date = new Date(result.getTimestamp(t).getTime());
                            String f = sdf.format(date);
                            obj.put(result.getMetaData().getColumnLabel(t).toLowerCase(),sdf.format(date));
                        }else
                        {
                            obj.put(result.getMetaData().getColumnLabel(t).toLowerCase(), result.getObject(t));
                        }
                    }
                    return obj;
                }
            });
            
            /*for (JSONObject obj : objs) {
                System.out.println(obj.toJSONString());
            }*/
            
            /**
             * 將查詢出來的數據寫入到elasticsearch中
             */
            BulkRequestBuilder bulkRequest =null;
            try {
                bulkRequest = client.prepareBulk();

                for (JSONObject obj : objs) {
                    byte[] json;

                    try {
                        json = mapper.writeValueAsBytes(obj);
                        bulkRequest.add(new IndexRequest(Constant.INDEX, Constant.INDEX, obj.getString("order_id"))
                                .source(json));

                    } catch (JsonProcessingException e) {
                        e.printStackTrace();
                    }
                }

                BulkResponse bulkResponse = bulkRequest.get();

                if (bulkResponse.hasFailures()) {
                    logger.info("====================批量創建索引過程中出現錯誤 下面是錯誤信息==========================");  
                    long count = 0L;  
                    for (BulkItemResponse bulkItemResponse : bulkResponse) {  
                        System.out.println("發生錯誤的 索引id為 : "+bulkItemResponse.getId()+" ,錯誤信息為:"+ bulkItemResponse.getFailureMessage());  
                        count++;  
                    }  
                    logger.info("====================批量創建索引過程中出現錯誤 上面是錯誤信息 共有: "+count+" 條記錄=========================="); 
                    currentTimestamp = previousTimestamp;
                } else {
                    logger.info("The lastest currenttimestamp : ".concat(currentTimestamp.toString()));
                    previousTimestamp = currentTimestamp;
                    //將寫入成功后的時間寫到zookeeper中
                    zkClient.writeData(Constant.NODE_PATH, String.valueOf(currentTimestamp));
                }

            } catch (NoNodeAvailableException e) {
                currentTimestamp = previousTimestamp;
                e.printStackTrace();
            }
        }

    }
    
    
}
復制代碼

 

調度工具代碼

復制代碼
public class Executor {
    private static ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
    public static void run(Runnable r,long init,long delay,TimeUnit u){
        service.scheduleWithFixedDelay(r, init, delay, u);
    }
    
    
}
復制代碼

 

啟動類

復制代碼
public class StartRcpSync {
    
    private static final Logger Logger = LoggerFactory.getLogger(StartRcpSync.class);
    private static AbstractApplicationContext appContext = null;
    private static String confPath = null;
    
    static{
        //后續來讀取命令中的conf 例如 java -Dconf=conf/*.xml -classpath .:lib/*
        if(System.getProperty("conf") !=null){
            System.out.println(System.getProperty("user.dir"));
            confPath = System.getProperty("conf");
            System.out.println("讀取配置路徑conf目錄:"+confPath);
            appContext = new FileSystemXmlApplicationContext(confPath.concat("/applicationContext*.xml"));
        }else{
            confPath = "E:/aa/bb/src/main/resources/conf";
            appContext = new FileSystemXmlApplicationContext(confPath.concat("/applicationContext*.xml"));
        }
    }
    
    public static void main(String[] args) {
        Logger.info("Sync will starting ...");
        //加載配置文件
        appContext.registerShutdownHook();
        appContext.start();
        Logger.info("Sync has been started successfully.");
        //獲取zookeeper的連接
        ZKConnector zkClient = new ZKConnector();
        DistributedLock dl = new DistributedLock(new Random().nextInt(),appContext,zkClient);
        dl.run();
    }
    
    //just for Test 
    public static void DoTask(){
        Worker w =new Worker(appContext);
        w.doWork();
    }
    
    
}
復制代碼


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM