觀察者模式實際應用:監聽線程,意外退出線程后自動重啟


摘要:  觀察者模式,定義對象之間的一種一對多的依賴關系,當對象的狀態發生改變時,所有依賴於它的對象都得到通知並且被自動更新。觀察者模式在JDK中有現成的實現,java.util.Obserable。

  《設計模式就該這么學系列》文章:

設計模式就該這么學:為什么要學設計模式?(開篇漫談)

設計模式就該這么學:要走心才能遵循設計模式五大原則(第二篇)

設計模式就該這么學:以微信訂閱號來講觀察者模式(第三篇)

觀察者模式實際應用:監聽線程,意外退出線程后自動重啟

首先說下需求:通過ftp上傳約定格式的文件到服務器指定目錄下,應用程序能實時監控該目錄下文件變化,如果上傳的文件格式符合要求,將將按照每一行讀取解析再寫入到數據庫,解析完之后再將文件改名。(這個是原先已經實現了的功能,請看我的一篇文章java利用WatchService實時監控某個目錄下的文件變化並按行解析(注:附源代碼)

但項目上線一段時間后,發現再利用FileZilla登陸上傳文件,文件不能被解析,而重啟tomcat之后再上傳,又能解析,於是判定是監控指定目錄的那個線程掛掉了,導致上傳后的文件不能被檢測到,故也不能被解析。之后查看日志也最終驗證了我推斷。

  所以關鍵的問題就是:如何監聽線程,當意外退出線程后進行自動重啟,這也是本文所要利用觀察者模式實現的。

下面請看實現過程(尤其見紅色注解部分):

  1、web.xml監聽器配置文件監控監聽器,初始化創建一個監控指定目錄的線程  

<?xml version="1.0" encoding="UTF-8"?>
<web-app version="2.5" xmlns="http://java.sun.com/xml/ns/javaee"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://java.sun.com/xml/ns/javaee 
    http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd">

    <context-param>
        <param-name>contextConfigLocation</param-name>
        <param-value>classpath:root-context.xml</param-value>
    </context-param>

    <filter>
        <filter-name>CharacterEncodingFilter</filter-name>
        <filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class>
        <init-param>
            <param-name>encoding</param-name>
            <param-value>UTF-8</param-value>
        </init-param>
        <init-param>
            <param-name>forceEncoding</param-name>
            <param-value>true</param-value>
        </init-param>
    </filter>
    <filter-mapping>
        <filter-name>CharacterEncodingFilter</filter-name>
        <url-pattern>/*</url-pattern>
    </filter-mapping>

    <filter>
        <filter-name>sitemesh</filter-name>
        <filter-class>com.opensymphony.sitemesh.webapp.SiteMeshFilter</filter-class>
    </filter>

    <filter-mapping>
        <filter-name>sitemesh</filter-name>
        <url-pattern>/*</url-pattern>
    </filter-mapping>

    <servlet>
        <servlet-name>appServlet</servlet-name>
        <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
        <init-param>
            <param-name>contextConfigLocation</param-name>
            <param-value>classpath:servlet-context.xml</param-value>
        </init-param>
        <load-on-startup>1</load-on-startup>
    </servlet>

    <servlet-mapping>
        <servlet-name>appServlet</servlet-name>
        <url-pattern>/</url-pattern>
    </servlet-mapping>
    
    <!-- 配置spring監聽器 -->
    <listener>
        <listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
    </listener>
    <!-- 配置監控文件變化監聽器 -->
    <listener>
        <listener-class>com.zealer.ad.listener.ThreadStartUpListenser</listener-class>
    </listener>
    <listener>
        <listener-class>com.zealer.ad.listener.SessionLifecycleListener</listener-class>
    </listener>
    
    
    <jsp-config>
      <taglib>
       <taglib-uri>/tag</taglib-uri>
       <taglib-location>/WEB-INF/tag/tag.tld</taglib-location>
      </taglib>
    </jsp-config>

    <welcome-file-list>
        <welcome-file>index.jsp</welcome-file>
    </welcome-file-list>
    
    <session-config>
        <session-timeout>45</session-timeout>
    </session-config>
</web-app>

  2、觀察者對象:編寫一個觀察者實現類

    當“監控指定目錄線程”,即WatchFilePathTask 掛掉后,會調用doBusiness(),doBusiness()調用notifyObservers(),notifyObservers()調用所有的具體觀察者的update()方法,也就是下面這個實現的重啟線程的方法

    package com.zealer.ad.listener;
        
    import java.util.Observable;
    import java.util.Observer;
import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import com.zealer.ad.task.WatchFilePathTask; public class ObserverListener implements Observer{ private Log log = LogFactory.getLog(ObserverListener.class); @Override public void update(Observable o, Object arg) { log.info("WatchFilePathTask掛掉"); WatchFilePathTask run = new WatchFilePathTask(); run.addObserver(this); new Thread(run).start(); log.info("WatchFilePathTask重啟"); } }

  3、編寫一個ThreadStartUpListenser類,實現ServletContextListener,tomcat啟動時創建后台線程

package com.zealer.ad.listener;

import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.stereotype.Component;

import com.zealer.ad.task.WatchFilePathTask;

@Component
public class ThreadStartUpListenser implements ServletContextListener
{
    private static WatchFilePathTask r = new WatchFilePathTask();

    private Log log = LogFactory.getLog(ThreadStartUpListenser.class);
    
    @Override
    public void contextDestroyed(ServletContextEvent paramServletContextEvent)
    {
//        r.interrupt();
         
    }

    @Override
    public void contextInitialized(ServletContextEvent paramServletContextEvent)
    {
            ObserverListener listen = new ObserverListener();
       //給“監控指定目錄下的線程”(被觀察者),添加一個觀察者 r.addObserver(listen);
new Thread(r).start(); // r.start(); log.info("ImportUserFromFileTask is started!"); } }

  4、主題對象:創建指定目錄文件變化監控類WatchFilePathTask,當

package com.zealer.ad.task;

import java.io.File;
import java.io.FileFilter;
import java.nio.file.FileSystems;
import java.nio.file.Path;
import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.util.Observable;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.joda.time.DateTime;

import com.zealer.ad.util.ConfigUtils;
import com.zealer.ad.util.SpringUtils;

/**
 * 指定目錄文件變化監控類
 * @author cancer
 *
 */
public class WatchFilePathTask extends Observable implements Runnable
{
    private Log log = LogFactory.getLog(WatchFilePathTask.class);
    
    private static final String filePath = ConfigUtils.getInstance().getValue("userfile_path");
    
    private WatchService watchService;
    
    // 此方法一經調用,立馬可以通知觀察者,在本例中是監聽線程
    public void doBusiness()
  {
if(true)
     {
super.setChanged(); }
     //通知觀察者調用update()方法,重啟線程 notifyObservers(); } @Override
public void run() { try { //獲取監控服務 watchService = FileSystems.getDefault().newWatchService(); log.debug("獲取監控服務"+watchService); Path path = FileSystems.getDefault().getPath(filePath); log.debug("@@@:Path:"+path); final String todayFormat = DateTime.now().toString("yyyyMMdd"); File existFiles = new File(filePath); //啟動時檢查是否有未解析的符合要求的文件 if(existFiles.isDirectory()) { File[] matchFile = existFiles.listFiles(new FileFilter() { @Override public boolean accept(File pathname) { if((todayFormat+".txt").equals(pathname.getName())) { return true; } else { return false; } } }); if(null != matchFile) { for (File file : matchFile) { //找到符合要求的文件,開始解析 ImportUserFromFileTask task = (ImportUserFromFileTask) SpringUtils.getApplicationContext().getBean("importUserFromFileTask"); task.setFileName(file.getAbsolutePath()); task.start(); } } } //注冊監控服務,監控新增事件 WatchKey key = path.register(watchService, StandardWatchEventKinds.ENTRY_CREATE); while (true) { key = watchService.take(); for (WatchEvent<?> event : key.pollEvents()) { //獲取目錄下新增的文件名 String fileName = event.context().toString(); //檢查文件名是否符合要求 if((todayFormat+".txt").equals(fileName)) { String filePath = path.toFile().getAbsolutePath()+File.separator+fileName; log.info("import filePath:"+filePath); //啟動線程導入用戶數據 ImportUserFromFileTask task = (ImportUserFromFileTask) SpringUtils.getApplicationContext().getBean("importUserFromFileTask");//new ImportUserFromFileTask(filePath); task.setFileName(filePath); task.start(); log.debug("啟動線程導入用戶數據"+task); } } key.reset(); } }
     catch (Exception e) { e.printStackTrace(); System.out.println("已經到這里來了"); doBusiness();//在拋出異常時調用,通知觀察者,讓其重啟線程 } } }

  5、創建解析用戶文件及導入數據庫線程,由WatchFilePathTask啟動

package com.zealer.ad.task;

import com.zealer.ad.entity.AutoPutUser;
import com.zealer.ad.entity.Bmsuser;
import com.zealer.ad.service.AutoPutUserService;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.joda.time.DateTime;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStreamReader;

import java.util.Date;

import javax.annotation.Resource;


/**
 * 解析用戶文件及入庫線程,由WatchFilePathTask啟動
 * @author cancer
 *
 */
public class ImportUserFromFileTask extends Thread {
    private Log log = LogFactory.getLog(ImportUserFromFileTask.class);
    private String fileName;
    @Resource(name = "autoPutUserService")
    private AutoPutUserService autoPutUserService;

    @Override
    public void run() {
        File file = new File(fileName);

        if (file.exists() && file.isFile()) {
            log.debug(":@@@准備開始休眠10秒鍾:" + file);

            //休眠十分鍾,防止文件過大還沒完全拷貝到指定目錄下,這里的線程就開始讀取文件
            try {
                sleep(10000);
            } catch (InterruptedException e1) {
                e1.printStackTrace();
            }

            InputStreamReader read;

            try {
                read = new InputStreamReader(new FileInputStream(file), "UTF-8");

                BufferedReader bufferedReader = new BufferedReader(read);
                String lineTxt = null;
                int count = 0;
                Boolean f = false;

                while ((lineTxt = bufferedReader.readLine()) != null) {
                    if ((null == lineTxt) || "".equals(lineTxt)) {
                        continue;
                    }

                    if (lineTxt.startsWith("'")) {
                        lineTxt = lineTxt.substring(1, lineTxt.length());
                    }

                    //解析分隔符為', '
                    String[] lines = lineTxt.split("', '");
                    int length = lines.length;

                    if (length < 2) {
                        continue;
                    }

                    Bmsuser bmsuser = new Bmsuser();
                    bmsuser.setName(lines[0]);if (!"".equals(lines[1])) {
                        bmsuser.setCity(lines[1]);
                    }
            //根據唯一索引已經存在的數據則不插入
                    f = autoPutUserService.insertIgnore(bmsuser);

                    if (f) {
                        count++;
                    }
                }

                //匯總數據
                AutoPutUser autoPutUser = new AutoPutUser();
                autoPutUser.setTotalCount(autoPutUserService.getUserCount());
                autoPutUser.setCount(count);
                autoPutUser.setCountDate(new Date(System.currentTimeMillis()));

                String today = DateTime.now().toString("yyyy-MM-dd");
                Integer oldCount = autoPutUserService.getOldCount(today);

                //如果今天導入過了就更新否則插入
                if (!oldCount.equals(0)) {
                    autoPutUserService.updateUserData(autoPutUser, today,
                        oldCount);
                } else {
                    autoPutUserService.gatherUserData(autoPutUser);
                }

                //注意:要關閉流
                read.close();
            } catch (Exception e) {
                log.error(e.getMessage(), e);
            }

            File newFile = new File(file.getPath() +
                    System.currentTimeMillis() + ".complate");
            file.renameTo(newFile);
        } else {
            log.error(fileName + " file is not exists");
        }
    }

    public String getFileName() {
        return fileName;
    }

    public void setFileName(String fileName) {
        this.fileName = fileName;
    }

    public AutoPutUserService getAutoPutUserService() {
        return autoPutUserService;
    }

    public void setAutoPutUserService(AutoPutUserService autoPutUserService) {
        this.autoPutUserService = autoPutUserService;
    }
}

 

 

附帶:

1、sql腳本

CREATE TABLE `bmsuser` (
  `id` int(255) unsigned NOT NULL AUTO_INCREMENT, `name` varchar(32) DEFAULT NULL , `city` varchar(32) DEFAULT NULL COMMENT , PRIMARY KEY (`bmsid`), UNIQUE KEY `bbLoginName` (`bbLoginName`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;

2、文件格式,命名為yyyyMMdd.txt

'張三', '深圳'

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM