quartz結合多線程處理后台業務


  最近項目中有播放視頻的需求,技術選型采用UMS播放器,免費版只能播放FLV格式的視頻文件,因此需要對用戶上傳的視頻進行格式轉換,轉換工具為FormatFactory,功能還是比較強大的。但是面臨的一個問題,視頻轉換是非常耗時的,上傳完直接轉換是沒法接受的,於是決定采用quartz,以任務調度的方式,在后台進行轉換,具體步驟如下:

  1.定義一個任務隊列,將待轉換的視頻文件信息放到隊列中。采用單例模式,並且考慮到線程安全問題,采用線程安全的Vector作為隊列容器:

  

復制代碼
/**
 * 格式轉換任務隊列
 * 隊列中放的是ResourceInfo類型對象
 * @author Administrator
 *
 */
public class TransformTaskQueue {
</span><span style="color: #0000ff;">private</span> <span style="color: #0000ff;">static</span> TransformTaskQueue instance = <span style="color: #0000ff;">null</span><span style="color: #000000;">;

</span><span style="color: #008000;">//</span><span style="color: #008000;">實際存放轉換對象信息的隊列,采用線程安全的Vercor容器</span>
<span style="color: #0000ff;">public</span> <span style="color: #0000ff;">static</span> Vector&lt;ResourceInfo&gt; taskQueue = <span style="color: #0000ff;">new</span> Vector&lt;ResourceInfo&gt;<span style="color: #000000;">();

</span><span style="color: #0000ff;">public</span> <span style="color: #0000ff;">static</span><span style="color: #000000;"> TransformTaskQueue getInstance() {
    </span><span style="color: #0000ff;">if</span> (instance == <span style="color: #0000ff;">null</span><span style="color: #000000;">) {
        instance </span>= <span style="color: #0000ff;">new</span><span style="color: #000000;"> TransformTaskQueue();
    }
    </span><span style="color: #0000ff;">return</span><span style="color: #000000;"> instance;
}

</span><span style="color: #008000;">/**</span><span style="color: #008000;">
 * 向隊列中添加對象
 * </span><span style="color: #808080;">@param</span><span style="color: #008000;"> info
 </span><span style="color: #008000;">*/</span>
<span style="color: #0000ff;">public</span> <span style="color: #0000ff;">static</span> <span style="color: #0000ff;">void</span><span style="color: #000000;"> add(ResourceInfo info) {
    taskQueue.add(info);
}

</span><span style="color: #008000;">/**</span><span style="color: #008000;">
 * 從隊列中刪除對象
 * </span><span style="color: #808080;">@param</span><span style="color: #008000;"> info
 </span><span style="color: #008000;">*/</span>
<span style="color: #0000ff;">public</span> <span style="color: #0000ff;">static</span> <span style="color: #0000ff;">void</span><span style="color: #000000;"> remove(ResourceInfo info) {
    </span><span style="color: #0000ff;">if</span>(taskQueue.size()&gt;0 &amp;&amp;<span style="color: #000000;"> taskQueue.contains(info)){
        taskQueue.remove(info);
    }
}

}

復制代碼

  2.用戶上傳視頻文件之后,后台進行判斷,如果不是flv格式,則將文件轉換所需信息封裝到ResuorceInfo對象,將該對象放入待轉換隊列:

復制代碼
// 如果源視頻文件存在,則進行相應的轉換,轉換為FLV文件
        if (new File(TransConfig.VIDEO_SOURCE_ROOT + path + fileName).exists()) {
        ResourceInfo info </span>= <span style="color: #0000ff;">new</span><span style="color: #000000;"> ResourceInfo();
        info.setResourceId(resourceId);
        info.setPath(path);
        info.setFileName(fileName);
        info.setStatus(</span>0<span style="color: #000000;">);
        </span><span style="color: #008000;">//</span><span style="color: #008000;"> 添加到轉換隊列</span>

TransformTaskQueue.add(info);

    } </span><span style="color: #0000ff;">else</span><span style="color: #000000;"> {
        System.out.println(</span>"源文件不存在!"<span style="color: #000000;">);
    }</span></pre>
復制代碼

  3.執行單個具體文件轉換的操作類代碼如下:

復制代碼
/**
 * 執行具體轉換操作的類,
 * 采用多線程技術,繼承了runnable接口
 * @author Administrator
 *
 */
public class TransformExecutor implements Runnable,Serializable{
</span><span style="color: #0000ff;">private</span> <span style="color: #0000ff;">static</span> <span style="color: #0000ff;">final</span> <span style="color: #0000ff;">long</span> serialVersionUID = 1L<span style="color: #000000;">;

</span><span style="color: #0000ff;">private</span> ResourceInfo info = <span style="color: #0000ff;">null</span><span style="color: #000000;"> ;

</span><span style="color: #0000ff;">public</span><span style="color: #000000;"> TransformExecutor(ResourceInfo info){
    </span><span style="color: #0000ff;">this</span>.info =<span style="color: #000000;"> info;
}

@Override
</span><span style="color: #0000ff;">public</span> <span style="color: #0000ff;">void</span><span style="color: #000000;"> run() {
    
    String resourceId </span>=<span style="color: #000000;"> info.getResourceId();
    String path </span>=<span style="color: #000000;"> info.getPath();
    String fileName </span>=<span style="color: #000000;"> info.getFileName();

    String videoFilename </span>= TransConfig.VIDEO_SOURCE_ROOT +<span style="color: #000000;"> path
            </span>+<span style="color: #000000;"> fileName;
    String flvFilename </span>=<span style="color: #000000;"> path
            </span>+ FileUtil.getFilePrefix(fileName) + ".flv"<span style="color: #000000;">;

    </span><span style="color: #008000;">//</span><span style="color: #008000;"> 轉換成功,修改數據庫中的is_transed字段為1</span>
    <span style="color: #0000ff;">if</span> (Video2FLVTransfer.transform(videoFilename, flvFilename) == 1<span style="color: #000000;">) {
        CRUDUtil.update(resourceId, </span>1<span style="color: #000000;">);
    }
    </span><span style="color: #008000;">//</span><span style="color: #008000;"> 轉換失敗,修改數據庫中的is_transed字段為2</span>
    <span style="color: #0000ff;">else</span><span style="color: #000000;"> {
        CRUDUtil.update(resourceId, </span>2<span style="color: #000000;">);
    }
    
    </span><span style="color: #008000;">//</span><span style="color: #008000;"> 將resourceInfo從轉換隊列中去除</span>

TransformTaskQueue.remove(info);

}

}

復制代碼

  4.下面是開啟多線程轉換的操作類,采用線程池技術,因為轉換視頻文件格式工作量比較大,因此規定每次最多開啟3個線程:

復制代碼
/**
 * 轉換執行器服務類
 * @author Administrator
 *
 */
public class TransExecutorService {
</span><span style="color: #0000ff;">private</span> <span style="color: #0000ff;">final</span><span style="color: #000000;"> ExecutorService pool;

</span><span style="color: #0000ff;">private</span> <span style="color: #0000ff;">static</span><span style="color: #000000;"> TransExecutorService instance;
</span><span style="color: #008000;">//</span><span style="color: #008000;">線程池大小,即每次最多允許開啟幾個線程執行轉換操作</span>
<span style="color: #0000ff;">private</span> <span style="color: #0000ff;">static</span> <span style="color: #0000ff;">final</span> <span style="color: #0000ff;">int</span> THREAD_SIZE = 3<span style="color: #000000;">;

</span><span style="color: #0000ff;">public</span> <span style="color: #0000ff;">static</span><span style="color: #000000;"> TransExecutorService getInstance() {
    </span><span style="color: #0000ff;">if</span> (instance == <span style="color: #0000ff;">null</span><span style="color: #000000;">) {
        instance </span>= <span style="color: #0000ff;">new</span><span style="color: #000000;"> TransExecutorService();
    }
    </span><span style="color: #0000ff;">return</span><span style="color: #000000;"> instance;
}

</span><span style="color: #0000ff;">private</span><span style="color: #000000;"> TransExecutorService() {

// pool = Executors.newCachedThreadPool();
pool = Executors.newFixedThreadPool(THREAD_SIZE);
}

</span><span style="color: #008000;">/**</span><span style="color: #008000;">
 * 開啟新線程,執行轉換操作
 * </span><span style="color: #808080;">@param</span><span style="color: #008000;"> info
 </span><span style="color: #008000;">*/</span>
<span style="color: #0000ff;">public</span> <span style="color: #0000ff;">void</span><span style="color: #000000;"> execute(ResourceInfo info) {
    </span><span style="color: #0000ff;">try</span><span style="color: #000000;"> {
        pool.submit(</span><span style="color: #0000ff;">new</span><span style="color: #000000;"> TransformExecutor(info));
    } </span><span style="color: #0000ff;">catch</span><span style="color: #000000;"> (Exception e) {
        e.printStackTrace();
    }
}

</span><span style="color: #0000ff;">public</span> <span style="color: #0000ff;">synchronized</span> <span style="color: #0000ff;">void</span><span style="color: #000000;"> shutdown() {
    pool.shutdown();
}

}

復制代碼

  5.調度任務實現類,即每次執行調度,執行的操作

復制代碼
/**
 * 調度任務具體執行類
 * @author Administrator
 *
 */
public class TransformJob implements Job {
@Override
</span><span style="color: #0000ff;">public</span> <span style="color: #0000ff;">void</span> execute(JobExecutionContext ctx) <span style="color: #0000ff;">throws</span><span style="color: #000000;"> JobExecutionException {
    
    </span><span style="color: #008000;">//</span><span style="color: #008000;">獲取當前待轉換視頻文件隊列</span>
    Vector&lt;ResourceInfo&gt; infos =<span style="color: #000000;"> TransformTaskQueue.getInstance().taskQueue;
    System.out.println(</span>"size:"+<span style="color: #000000;">infos.size());
    
    </span><span style="color: #008000;">//</span><span style="color: #008000;">如果任務隊列中存在待轉換對象,則進行轉換</span>
    <span style="color: #0000ff;">if</span> (infos.size() &gt; 0<span style="color: #000000;">) {
        </span><span style="color: #0000ff;">for</span> (<span style="color: #0000ff;">int</span> i=0;i&lt;infos.size();i++<span style="color: #000000;">) {
            </span><span style="color: #008000;">//</span><span style="color: #008000;">status為0,表示不是正在轉換中的</span>
            <span style="color: #0000ff;">if</span> (infos.get(i).getStatus() == 0<span style="color: #000000;">) {
                infos.get(i).setStatus(</span>1<span style="color: #000000;">);
                </span><span style="color: #008000;">//</span><span style="color: #008000;">新開線程,執行轉換操作</span>

TransExecutorService.getInstance().execute(infos.get(i));
}
}
}
}

}

復制代碼

  6.任務調度管理類,規定了調度執行的一些規則,其中定時表達式請自行網上搜索,這里采用的是每10秒執行一次。

復制代碼
/**
 * 格式轉換任務調度管理類
 * 
 * @author Administrator
 * 
 */
public class SchedulManager {
</span><span style="color: #0000ff;">private</span> <span style="color: #0000ff;">static</span> SchedulManager instance = <span style="color: #0000ff;">new</span><span style="color: #000000;"> SchedulManager();
</span><span style="color: #0000ff;">private</span><span style="color: #000000;"> Scheduler scheduler;
</span><span style="color: #0000ff;">private</span> <span style="color: #0000ff;">volatile</span> <span style="color: #0000ff;">boolean</span> start = <span style="color: #0000ff;">false</span><span style="color: #000000;">;

</span><span style="color: #0000ff;">private</span><span style="color: #000000;"> SchedulManager() {
    </span><span style="color: #0000ff;">try</span><span style="color: #000000;"> {
        SchedulerFactory factory </span>= <span style="color: #0000ff;">new</span><span style="color: #000000;"> StdSchedulerFactory();
        scheduler </span>=<span style="color: #000000;"> factory.getScheduler();
    } </span><span style="color: #0000ff;">catch</span><span style="color: #000000;"> (SchedulerException e) {
        e.printStackTrace();
    }
}

</span><span style="color: #0000ff;">public</span> <span style="color: #0000ff;">static</span><span style="color: #000000;"> SchedulManager getInstance() {
    </span><span style="color: #0000ff;">return</span><span style="color: #000000;"> instance;
}

</span><span style="color: #008000;">/**</span><span style="color: #008000;">
 * 開始執行,將加載調度配置並啟動每個調度。
 * 
 * @注意: 一般在程序啟動時調用該方法。
 </span><span style="color: #008000;">*/</span>
<span style="color: #0000ff;">public</span> <span style="color: #0000ff;">void</span><span style="color: #000000;"> execute() {
    </span><span style="color: #0000ff;">try</span><span style="color: #000000;"> {
        </span><span style="color: #008000;">//</span><span style="color: #008000;"> 加載調度配置,並啟動每個調度。</span>

scheduleJobs();
scheduler.start();
}
catch (Exception e) {
e.printStackTrace();
}
}

</span><span style="color: #008000;">/**</span><span style="color: #008000;">
 * 加載調度配置並啟動每個調度
 * 
 * @注意: TODO
 </span><span style="color: #008000;">*/</span><span style="color: #000000;">
@SuppressWarnings(</span>"static-access"<span style="color: #000000;">)
</span><span style="color: #0000ff;">private</span> <span style="color: #0000ff;">void</span><span style="color: #000000;"> scheduleJobs() {

    Vector</span>&lt;ResourceInfo&gt; infos =<span style="color: #000000;"> TransformTaskQueue.getInstance().taskQueue;
    System.out.println(</span>"size:" +<span style="color: #000000;"> infos.size());
    </span><span style="color: #0000ff;">if</span> (infos.size() &gt; 0<span style="color: #000000;">) {
        start();
    }
    start </span>= <span style="color: #0000ff;">true</span><span style="color: #000000;">;
}

</span><span style="color: #008000;">/**</span><span style="color: #008000;">
 * 根據ResourceInfo啟動一個調度
 * 
 * @注意: 內部方法,外部不能調用
 * </span><span style="color: #808080;">@param</span><span style="color: #008000;"> ResourceInfo
 *            資源信息
 </span><span style="color: #008000;">*/</span>
<span style="color: #0000ff;">private</span> <span style="color: #0000ff;">void</span><span style="color: #000000;"> start() {

    </span><span style="color: #0000ff;">try</span><span style="color: #000000;"> {
        </span><span style="color: #008000;">//</span><span style="color: #008000;"> String id = info.getResourceId();
        </span><span style="color: #008000;">//</span><span style="color: #008000;"> 構造方法中 第一個是任務名稱 ,第二個是任務組名,第三個是任務執行的類</span>
        JobDetail jobDetail = <span style="color: #0000ff;">new</span> JobDetail("video_trans_id"<span style="color: #000000;">,
                Scheduler.DEFAULT_GROUP, TransformJob.</span><span style="color: #0000ff;">class</span><span style="color: #000000;">);
        String cronExpr </span>= "0/10 * * * * ?"<span style="color: #000000;">;
        String triggerName </span>= "video_trans_trigger"<span style="color: #000000;">;
        Trigger trigger </span>= <span style="color: #0000ff;">new</span><span style="color: #000000;"> CronTrigger(triggerName,
                Scheduler.DEFAULT_GROUP, cronExpr);

        scheduler.scheduleJob(jobDetail, trigger);

    } </span><span style="color: #0000ff;">catch</span><span style="color: #000000;"> (Exception e) {
        System.out.println(</span>"出錯"<span style="color: #000000;">);
        e.printStackTrace();
    }
}

</span><span style="color: #0000ff;">public</span> <span style="color: #0000ff;">void</span><span style="color: #000000;"> init() {

    SchedulManager sm </span>=<span style="color: #000000;"> SchedulManager.getInstance();
    </span><span style="color: #008000;">//</span><span style="color: #008000;"> sm.start(info);
    </span><span style="color: #008000;">//</span><span style="color: #008000;"> sm.scheduleJobs();</span>

sm.start();
try {
sm.scheduler.start();
}
catch (SchedulerException e) {
// TODO 自動生成的 catch 塊
e.printStackTrace();
}
}

復制代碼

  7.通過上述6個步驟,已經可以通過quartz以任務調度的形式來進行格式轉換了,接下來的問題,是寫一個listener類,以實現在服務器啟動的時候,任務調度自動啟動。

首先需要在web.xml中加入如下配置:

    <listener> 
        <listener-class>com.yunda.web.EventTransformStartupListener</listener-class> 
    </listener>

之后就是實現配置文件中的實現監聽功能的類,非常簡單,就是調用SchedulManager中的init()方法即可,代碼如下:

復制代碼
public class EventTransformStartupListener implements ServletContextListener {
@Override
</span><span style="color: #0000ff;">public</span> <span style="color: #0000ff;">void</span><span style="color: #000000;"> contextDestroyed(ServletContextEvent arg0) {
}

@Override
</span><span style="color: #0000ff;">public</span> <span style="color: #0000ff;">void</span><span style="color: #000000;"> contextInitialized(ServletContextEvent arg0) {
    
    System.out.println(</span>"init..."<span style="color: #000000;">);
    
    SchedulManager sm </span>=<span style="color: #000000;"> SchedulManager.getInstance();
    </span><span style="color: #008000;">//</span><span style="color: #008000;">sm.start(info);</span>

sm.init();
}
}

復制代碼

  至此,后台進行格式轉換的功能全部完成,通過做這個功能,發現quartz采用的任務調度機制,跟linux的crontab差不多,也是采用定時掃描的方法來完成,連定時表達式的規則都長的差不多。先寫這么多吧,就是學習了quartz和多線程的簡單用法,留個筆記,以便日后深究^_^


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2026 CODEPRJ.COM