Java豆瓣電影爬蟲——減少與數據庫交互實現批量插入


  節前一個誤操作把mysql中record表和movie表都清空了,顯然我是沒有做什么mysql備份的。所以,索性我把所有的表數據都清空的,一夜回到解放前……

  項目地址:https://github.com/DMinerJackie/JewelCrawler

  在上一個版本中,record表存儲了7萬多條記錄,爬取的有4萬多條,但是可以明顯的發現爬取的數據量越多的時候,機子就越卡。又一次報錯,是有關JDBC的,還有一次機子跑卡死了。

  仔細一琢磨,上個版本的爬蟲程序與數據庫的讀寫次數太頻繁,存在以下問題:

    1.程序運行,從種子地址開始,對於每次爬取的網站地址先查詢數據庫是否存在該條記錄,如果不存在,則立即插入;

    2.當前網站地址爬取完畢后,查找數據庫從中取出第一個crawled為0的記錄進行爬取,每次只取一條;

    3.存儲電影詳情頁記錄以及短評數據都是采用解析一條則立即存儲到數據庫。

  顯然,上面的這種方式是一目了然的效率低下,所以今天下午對相關代碼進行改造,部分實現了批量插入,盡可能減少與數據庫的交互,從而降低時空成本。

 

  在git clone完項目后,發現一個很詭異的現象,JewelCrawler每次都是爬取種子地址,並沒有一次查詢數據庫中crawled字段為0的記錄進行一一爬取,但是之前在本機上是完美運行的,可能是在push代碼前做了改動影響運行了。

既然問題出現了,就順着這個版本看看,最終發現問題的原因是對於種子網址並沒有存儲到mysql的record表中,所以在DoubanCrawler類中

//set boolean value "crawled" to true after crawling this page
sql = "UPDATE record SET crawled = 1 WHERE URL = '" + url + "'";
stmt = conn.createStatement();

 if (stmt.executeUpdate(sql) > 0) {
          //get the next page that has not been crawled yet
           sql = "SELECT * FROM record WHERE crawled = 0";
           stmt = conn.createStatement();
           rs = stmt.executeQuery(sql);
           if (rs.next()) {
                    url = rs.getString(2);
           } else {
                    //stop crawling if reach the bottom of the list
                    break;
           }

           //set a limit of crawling count
           if (count > Constants.maxCycle || url == null) {
                    break;
           }
}

  執行stmt.executeUpdate(sql) > 0是返回的值為0,從而不會從數據庫中讀取crawled為0的記錄,最后就一直在while的循環中爬取種子網站。

  解決方法:對於種子網站既然沒有存儲到record的操作,那么就對種子網站做特殊處理,將if的判斷條件改為if (stmt.executeUpdate(sql) > 0 || frontPage.equals(url)),這樣對於種子網站即使沒有update更新成功操作仍然可以進入讀取數據庫crawled為0 的操作。

 

針對第一個問題,采用批量插入操作

  實現思路:對於當前爬取的網站地址,解析網頁源碼,提取出所有的link,對於符合正則表達式過濾的link,將其存到一個list集合中。遍歷完當前網址的所有link后,將符合條件的link批量存儲到數據庫中。

  具體實現如下

public static void parseFromString(String content, Connection conn) throws Exception {

        Parser parser = new Parser(content);
        HasAttributeFilter filter = new HasAttributeFilter("href");

        String sql1 = null;
        ResultSet rs1 = null;
        PreparedStatement pstmt1 = null;
        Statement stmt1 = null;

        List<String> nextLinkList = new ArrayList<String>();

        int rowCount = 0;
        sql1 = "select count(*) as rowCount from record";
        stmt1 = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_UPDATABLE);
        rs1 = stmt1.executeQuery(sql1);
        if (rs1.next()) {
            rowCount = rs1.getString("rowCount") != null ? Integer.parseInt(rs1.getString("rowCount")) : 0;
        }

        if (rowCount <= Constants.maxCycle) { //once rowCount is bigger than maxCycle, the new crawled link will not insert into record table
            try {
                NodeList list = parser.parse(filter);
                int count = list.size();

                //process every link on this page
                for (int i = 0; i < count; i++) {
                    Node node = list.elementAt(i);

                    if (node instanceof LinkTag) {
                        LinkTag link = (LinkTag) node;
                        String nextLink = link.extractLink();
                        String mainUrl = Constants.MAINURL;

                        if (nextLink.startsWith(mainUrl)) {
                                //check if the link already exists in the database
                                sql1 = "SELECT * FROM record WHERE URL = '" + nextLink + "'";
                                stmt1 = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_UPDATABLE);
                                rs1 = stmt1.executeQuery(sql1);
                                if (rs1.next()) {

                                } else {
                                    Pattern moviePattern = Pattern.compile(Constants.MOVIE_REGULAR_EXP);
                                    Matcher movieMatcher = moviePattern.matcher(nextLink);

                                    Pattern commentPattern = Pattern.compile(Constants.COMMENT_REGULAR_EXP);
                                    Matcher commentMatcher = commentPattern.matcher(nextLink);

                                    if (movieMatcher.find() || commentMatcher.find()) {
                                        nextLinkList.add(nextLink);
                                    }
                                }
                        }
                    }
                }
                if (nextLinkList.size() > 0) {
                    conn.setAutoCommit(false);
                    //if the link does not exist in the database, insert it
                    sql1 = "INSERT INTO record (URL, crawled) VALUES (?,0)";
                    pstmt1 = conn.prepareStatement(sql1, Statement.RETURN_GENERATED_KEYS);
                    for (String nextLinkStr : nextLinkList) {
                        pstmt1.setString(1, nextLinkStr);
                        pstmt1.addBatch();
                        System.out.println(nextLinkStr);
                    }
                    pstmt1.executeBatch();
                    conn.commit();
                }
            } catch (Exception e) {
                //handle the exceptions
                e.printStackTrace();
                System.out.println("SQLException: " + e.getMessage());
            } finally {
                //close and release the resources of PreparedStatement, ResultSet and Statement
                if (pstmt1 != null) {
                    try {
                        pstmt1.close();
                    } catch (SQLException e2) {
                    }
                }
                pstmt1 = null;

                if (rs1 != null) {
                    try {
                        rs1.close();
                    } catch (SQLException e1) {
                    }
                }
                rs1 = null;

                if (stmt1 != null) {
                    try {
                        stmt1.close();
                    } catch (SQLException e3) {
                    }
                }
                stmt1 = null;
            }
        }
    }

  1.通過正則匹配,找到符合條件的link,並添加到nextLinkList集合中

      2.遍歷完后,將數據存到數據庫中

    3. 在批量操作中,使用了addBatch()方法和executeBatch()方法,注意需要添加conn.setAutoCommit(false);以及conn.commit()表示手動提交。

 

針對第二個問題,采用一次查詢多條記錄

  實現思路:將每次只查詢一條記錄,改為每次查詢10條記錄,並將這10條記錄存放到list集合中,並將原來的String類型的url改為list類型的urlList傳入到DouBanHttpGetUtil.getByString()方法里。這樣即減少了與數據庫的交互,同時也減少了對於getByString方法的調用。

  具體實現如下

public static void main(String args[]) throws Exception {

        //load and read seed file
        List<String> seedList = LoadSeed.loadSeed();
        if (seedList == null) {
            log.info("No seed to crawl, please check again");
            return;
        }
        String frontPage = seedList.get(0);

        //connect database mysql
        Connection conn = DBUtils.connectDB();

        //create tables to store crawled data
        DBUtils.createTables();

        String sql = null;
        String url = frontPage;
        Statement stmt = null;
        ResultSet rs = null;
        int count = 0;
        List<String> urlList = new ArrayList<String>();
        urlList.add(url);

        //crawl every link in the database
        while (true) {
            //get page content of link "url"
            DouBanHttpGetUtil.getByString(urlList, conn);
            count++;

            //set boolean value "crawled" to true after crawling this page

            //TODO batch update
            int result = 0;
            conn.setAutoCommit(true);
            for (String urlStr : urlList) {
                sql = "UPDATE record SET crawled = 1 WHERE URL = '" + urlStr + "'";
                stmt = conn.createStatement();
                stmt.executeUpdate(sql);
            }

            urlList.clear();//empty for every loop
            if (stmt.executeUpdate(sql) > 0  || frontPage.equals(url)) {
                //get the next page that has not been crawled yet
                sql = "SELECT * FROM record WHERE crawled = 0 limit 10";
                stmt = conn.createStatement();
                rs = stmt.executeQuery(sql);
                while (rs.next()) {
                    url = rs.getString(2);
                    urlList.add(url);
                }

                //set a limit of crawling count
                if (rs.next() || count > Constants.maxCycle || url == null) {
                    break;
                }
            }
        }
        conn.close();
        conn = null;

        System.out.println("Done.");
        System.out.println(count);
    }

  注意: 1.這里采用每次讀取10條記錄,相應的也需要將這10條記錄的crawled字段更新為1,表示爬取過。

     2. mysql不支持top 10 * 這樣的語法,但是可以通過代碼中所示的limit 10 的方式取出數據。

                 3. 添加conn.setAutoCommit(true);表示更新操作設置為自動提交,這樣就可以解決雖然程序執行成功但是數據沒有更新到數據庫的現象。

 

針對第三個問題,與第一個問題解決方法相同。

雖然不知道這樣做帶來的效果有多明顯,或有是否有更好的解決方案,但是可以肯定的是上個版本的代碼會大量占用內存並頻繁與數據庫交互。本人是數據庫小白,希望有更好的方案可以提出來^_^

 

如果您覺得閱讀本文對您有幫助,請點一下“推薦”按鈕,您的“推薦”將是我最大的寫作動力!如果您想持續關注我的文章,請掃描二維碼,關注JackieZheng的微信公眾號,我會將我的文章推送給您,並和您一起分享我日常閱讀過的優質文章。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM