如何使用Nodejs進行批量下載


0x1 Nodejs登場

Nodejs是一款基於谷人希的V8引擎開發javascript運行環境。在高性能的V8引擎以及事件驅動的單線程異步非阻塞運行模型的支持下,Nodejs實現的web服務可以在沒有Nginx的http服務器做反向代理的情況下實現很高的業務並發量(當然了配合Nginx食用風味更佳)。

0x2 准備工作

現在我們假設你的爬蟲已經幫你爬到了一堆圖片的鏈接,然后你的nodejs腳本以某種方式(接收post http請求,進程間通信,讀寫文件或數據庫等等。。。)獲得了這些鏈接,這里我用某款大型角色扮演網絡游戲的官網上提供的壁紙鏈接為例子(這里似乎並沒有為一款運營10年經久不衰的游戲打廣告的意思,僅僅只是情懷溢出。。。)

(function() {
  "use strict";
  const urlList = [
    "http://content.battlenet.com.cn/wow/media/wallpapers/patch/fall-of-the-lich-king/fall-of-the-lich-king-1920x1080.jpg",
    "http://content.battlenet.com.cn/wow/media/wallpapers/patch/black-temple/black-temple-1920x1200.jpg",
    "http://content.battlenet.com.cn/wow/media/wallpapers/patch/zandalari/zandalari-1920x1200.jpg",
    "http://content.battlenet.com.cn/wow/media/wallpapers/patch/rage-of-the-firelands/rage-of-the-firelands-1920x1200.jpg",
    "http://content.battlenet.com.cn/wow/media/wallpapers/patch/fury-of-hellfire/fury-of-hellfire-3840x2160.jpg",
  ];
})();

我們可以對urlList執行一個遍歷來依次下載這些圖片,確切的說是依次啟動下載這些鏈接的任務。

(function() {
  //略...

  var startDownloadTask = function(imgSrc, dirName, index) {
    //TODO: startDownloadTask
  }

  urlList.forEach(function(item, index, array) {
    startDownloadTask(item, './', index);
  })
})();

startDownloadTask這個函數就是用來下載這些圖片的。其中imgSrc是圖片的鏈接,dirName是我們存放下載后的圖片的路徑,index是圖片鏈接在列表中的序號。我們在這個函數中,會調用Nodejs的系統Apihttp.request來完成下載工作,由於該Api和大多數Nodejs的Api一樣是異步非阻塞模式,所以startDownloadTask函數調用該Api后不會等待下載完成,就會立即返回。在下載的過程中,以及完成之后,或者發生異常時,系統會調用http.request的回掉函數來做相應的處理。我們接下來會看到該Api的詳細聲明和用法,在了解了該Api的使用方法之后,就可以用它來實現startDownloadTask函數。

0x3 http.request的聲明和使用方法

我們在Nodejs的官方文檔上可以找到http.request的完整聲明和各個參數的說明。它的聲明如下:

http.request(options[, callback])

其中options可以是帶有請求的目的地址的一條字符串,亦可以是一系用於發起請求的列詳細參數,用於對請求進行更精確的控制。我們現在暫時不需要這些精確的參數控制,直接傳入圖片的鏈接就可以。

至於callback參數就是剛才說到的回調函數,這是個非常重要的函數,圖片下載下來后能否存入我們指定的位置可全靠它。這個回調函數會接受一個入參,文檔中對這個入參沒有詳細說明,通過后面的例子我們發現,這個叫res的入參監聽了兩個事件,分別是dataend事件,並且還有一個setEncoding方法,並且還有statusCodeheaders兩個成員屬性。熟悉Nodejs Api的同學不難猜出,這個res其實是一個stream.Readable類型的子類的變量,那兩個事件監聽和setEncoding方法就是繼承自這個類型,而那兩個成員屬性是子類擴展的。這並沒有什么意外的,在其他語言的類庫中,http請求Api返回一個可讀數據流是很常見的做法。仔細閱讀文檔的其他部分后可以發現,這個res的真實類型是http.IncomingMessage。這里不得不對這種不寫明每個參數的類型的文檔提出批評,像javascript這種動態弱類型腳本語言,開發者要想知道一個Api各個參數和返回值有可能是什么類型,拿過來怎么處理可全靠文檔啊。

介紹完了入參,再來看看http.request會返回什么。文檔中說它會返回一個http.ClientRequest類型的變量,這個變量可以接受error事件,來對請求異常的情況進行處理。

剛才說過,這個Api是一個異步接口,調用這個Api之后會立即返回一個http.ClientRequest類型變量,假設變量名為req。但這時候不會馬上發起請求。我們這時候可以設置reqerror事件的監聽回調,如果是POST請求的話,還可以調用req.write方法來設置請求消息體,然后調用req.end方法來結束此次請求的發送過程。當收到響應時(嚴格的說是確認接收完響應頭時),就會調用callback回調函數,在這個回調函數中,可以通過讀取res.statusCoderes.headers獲取響應的返回狀態碼和頭部信息,其中頭部信息包含了重要的字段content-length,表示響應消息體的總長度。由於響應消息體可能很長,服務端需要把消息體拆分成多個tcp封包來發送,客戶端在接收到tcp封包后還要進行消息體的重組,所以這里采用一個數據流對象來對返回的消息體做讀取操作,需要注冊dataend事件監聽,分別處理鏈路層緩沖區接收了若干字節的消息體封包並且拼接完成回調上層協議處理和tcp連接拆線時的事務。

Api聲明后面附帶了一個例子,比較簡單不難看懂,這里就不詳細說了。

0x4 實現startDownloadTask

了解了http.request的基本使用方法,以及看過例子之后,我們很快就能寫出一個簡單的下載過程了:

(function() {
  "use strict";
  const http = require("http");

  //略...

  function getHttpReqCallback(imgSrc, dirName, index) {
    var callback = function(res) {
      // TODO: callback回調函數實現
    };

    return callback;
  }

  var startDownloadTask = function(imgSrc, dirName, index) {
    var req = http.request(imgSrc, getHttpReqCallback(imgSrc, dirName, index));
    req.on('error', function(e){});
    req.end();
  }

  //略
})();

我暫且先忽略了請求的錯誤處理。這里需要講解的是函數getHttpReqCallback,這個函數本身不是回調函數,在調用http.request時會先調用它,它返回了一個閉包callback,作為http.request的回調函數。我很快會解釋為什么需要這樣寫。

接下來我們來實現這個回調函數:

(function() {
  "use strict";
  const http = require("http");
  const fs = require("fs");
  const path = require("path");
  //略...

  function getHttpReqCallback(imgSrc, dirName, index) {
    var fileName = index + "-" + path.basename(imgSrc);
    var callback = function(res) {      
      var fileBuff = [];
      res.on('data', function (chunk) {
        var buffer = new Buffer(chunk);
        fileBuff.push(buffer);
      });
      res.on('end', function() {
        var totalBuff = Buffer.concat(fileBuff);      
        fs.appendFile(dirName + "/" + fileName, totalBuff, function(err){});
      });        
    };

    return callback;
  }

  //略
})();

這里的callback函數的邏輯目前為止還不是很復雜,resdata事件的回調函數中,chunk參數是從可讀數據流中讀出的數據,將其轉換為Buffer對象后插入fillBuff數組以待后用。

resend事件意味着鏈路層鏈接拆除,數據接收完畢,在該事件的回調中,我們通過Buffer.concat函數,將fileBuff中的所有Buffer對象依次重組為一個新的Buffer對象totalBuff,該對象既是接收到的完整的數據。之后通過fs.appendFile函數將totalBuff存入磁盤,存放路徑為dirName + "/" + fileName

於是我們就有了一個完整的勉強可以工作的腳本,完整的腳本代碼如下:

(function() {
  "use strict";
  const fs = require("fs");
  const http = require("http");
  const path = require("path");

  const urlList = [
    "http://content.battlenet.com.cn/wow/media/wallpapers/patch/fall-of-the-lich-king/fall-of-the-lich-king-1920x1080.jpg",
    "http://content.battlenet.com.cn/wow/media/wallpapers/patch/black-temple/black-temple-1920x1200.jpg",
    "http://content.battlenet.com.cn/wow/media/wallpapers/patch/zandalari/zandalari-1920x1200.jpg",
    "http://content.battlenet.com.cn/wow/media/wallpapers/patch/rage-of-the-firelands/rage-of-the-firelands-1920x1200.jpg",
    "http://content.battlenet.com.cn/wow/media/wallpapers/patch/fury-of-hellfire/fury-of-hellfire-3840x2160.jpg",
  ];

  function getHttpReqCallback(imgSrc, dirName, index) {
    var fileName = index + "-" + path.basename(imgSrc);
    var callback = function(res) {      
      var fileBuff = [];
      res.on('data', function (chunk) {
        var buffer = new Buffer(chunk);
        fileBuff.push(buffer);
      });
      res.on('end', function() {
        var totalBuff = Buffer.concat(fileBuff);
        fs.appendFile(dirName + "/" + fileName, totalBuff, function(err){});
      });
    };
    return callback;
  }

  var startDownloadTask = function(imgSrc, dirName, index) {
    var req = http.request(imgSrc, getHttpReqCallback(imgSrc, dirName, index));
    req.on('error', function(e){});
    req.end();
  }

  urlList.forEach(function(item, index, array) {
    startDownloadTask(item, './', index);
  })
})();

之所以說它勉強可工作,是因為它完全沒有做錯誤處理,程序的健壯性幾乎為0,甚至連打印日志都沒有了,下載過程中一旦出現任何意外情況,那就自求多福吧。

但即使這樣一個漏洞百出的代碼,也還是有幾點需要特殊說明。

為什么要采用閉包?

因為實際上作為http.request的回調函數callback,它的聲明原型決定的它只可以接受唯一一個參數res,但是在callback函數中我們需要明確知道下載下來的數據在硬盤上存放的路徑,這個路徑取決於startDownloadTask的入參dirNameindex。所以函數getHttpReqCallback就是用於創建一個閉包,將dirNameindex的值寫入這個閉包中。
其實我們原本並不需要getHttpReqCallback這個函數來顯示的返回一個閉包,而是可以直接使用內聯匿名函數的方法實現http.requestcallback,代碼大概會寫成這樣:

var startDownloadTask = function(imgSrc, dirName, index) {
  var req = http.request(imgSrc, function(res) {
    var fileName = index + "-" + path.basename(imgSrc);
    var fileBuff = [];
    res.on('data', function (chunk) {
      var buffer = new Buffer(chunk);
      fileBuff.push(buffer);
    });
    res.on('end', function() {
      var totalBuff = Buffer.concat(fileBuff);
      fs.appendFile(dirName + "/" + fileName, totalBuff, function(err){});
    });
  });
  req.on('error', function(e){});
  req.end();
}

這樣也可以工作,http.requestcallback直接訪問外層作用域的變量,即函數startDownloadTask的入參dirNameindex,這也是一個閉包。這樣寫的問題在於,一段異步代碼強行插入原本連貫的同步代碼中,也許現在你覺得這也沒什么,這是因為目前callback里還沒有處理任何的異常情況,所以邏輯比較簡單,這樣看起來也不算很混亂,但是我需要說的是,一旦后面加入了異常處理的代碼,這一塊看起來就會非常糟糕了。

為什么在data事件中要使用一個列表緩存接收到的所有數據,然后在end中一次性寫入硬盤?

首先要說的是,這里並不是出於通過減少寫磁盤次數達到提高性能或者延長磁盤壽命的目的,雖然可能確實有這樣的效果。根本原因在於,如果不采用一次性寫入,在nodejs的異步非阻塞運行機制下,這樣存入磁盤的數據會混亂,導致不堪入目的后果,比較直觀的情況見附錄。

在同步阻塞運行模型的語言中(java, c, python),確實存在將遠程連接傳輸過來的數據先緩存在內存里,待接收完整或或緩存了一定長度的數據之后再一次性寫入硬盤的做法,以達到減少寫磁盤操作次數的目的。但是如果在每一次從遠程連接接中讀取到數據之后立即將數據寫入硬盤,也不會有什么問題(tcp協議已經幫我們將數據包排好序),這是因為在同步阻塞運行模型中,讀tcp連接和寫磁盤這兩個動作必然不可能同時執行,而是讀tcp -> 寫磁盤 -> 讀tcp -> 寫磁盤...這樣的串行執行,在上一個操作完成之后,下一個操作才會開始。這樣的執行方式也許效率會比較低,但是寫入的磁盤的數據並不會混亂。

現在回到我們的異步非阻塞世界中來,在這個世界中,遠程讀取的操作是通過事件回調的方式發生的,resdata事件任何一個時間片內都可能觸發,你無法預知,無法控制,甚至觸發頻率都和你無關,那取決於本次連接的帶寬。而我們的寫磁盤操作fs.appendFile和Nodejs的大部分Api一樣是一個異步非阻塞的調用,它會非常快的返回,但是它所執行的寫文件操作,則會慢的多,而進程不會阻塞在那里等待這個操作完成。在常識里,遠程連接的下載速度比本地硬盤的寫入速度要慢,但這並不是絕對的,隨着網速的提高,現在一塊高速網卡,在高速的網絡中帶來的下載速度超過一塊老舊的機械硬盤的寫入速度並非不可能發生。除此之外,即使在較長的一段時間內,網絡的平均連接速度並沒有快的那么誇張,但是我們知道在tcp/ip協議棧中,鏈路層下層的網絡層中前后兩個ip報文的到達時間間隔也是完全無法確定的,有可能它們會在很短的時間間隔內到達,被tcp協議重組之后上拋給應用層協議,在我們的運行環境中以很短的間隔兩次觸發data事件,而這個間隔並不足夠磁盤將前一段數據寫入。

我畫個草圖來解釋到底發生什么事情:

|data1
|       |data2
|-----------------------------|  //<- write data1
|       |-----------------------------|  //<- write data2
|       |
|----------------------------------------------------------> time

此時要想寫入的數據保持有序不混亂,只能寄希望於機械硬盤的一面只有一個磁頭來從物理層面保證原子操作了。但是很可惜我們知道現代機械硬盤每一面至少都有兩個磁頭。

有着很多java或者c++編程經驗的你也許會想在這里加一個同步鎖,不過Nodejs作為一個表面宣稱的單線程環境(底層的V8引擎肯定還是有多線程甚至多進程調度機制實現的),在語法和Api層面並沒有鎖這個概念。

所以為了保證最終寫入磁盤的數據不混亂,在data事件的回調中不可以再用異步的方式處理數據了,於是有了現在這種先寫入緩存列表中,在數據接收完整后再一次性寫文件的做法。由於new Buffer(chunk)fileBuff.push(buffer)都是同步操作,並且執行的速度非常快;即使下一個data事件到來的比這兩個操作還要快,由於單線程運行模型的限制,也必須等待這兩個操作完成后才會開始第二次回調。所以能保證數據有序的緩存到內存中,再有序的寫入硬盤。

0x5 異常處理

剛才說到,目前為止我們的腳本雖然能夠正常工作,但是沒有異常處理,程序非常脆弱。由於異常處理是一個程序非常重要的部分,所以在這里我有義務要完成這部分代碼。

首先我們從最簡單的做起,打印一些日志來幫助調試程序。

(function() {
  //略。。
  function getHttpReqCallback(imgSrc, dirName, index) {
    var callback = function(res) {
      console.log("request: " + imgSrc + " return status: " + res.statusCode);
      //略。。
      res.on('end', function() {
        console.log("end downloading " + imgSrc);
        //略。。
      });
    };
    return callback;
  }

  var startDownloadTask = function(imgSrc, dirName, index) {
    console.log("start downloading " + imgSrc);
    //略。。。
  }
})

接下來我們在reqerror事件中,進行重新下載嘗試的操作:

  var startDownloadTask = function(imgSrc, dirName, index) {
    //略。。
    req.on('error', function(e){
      console.log("request " + imgSrc + " error, try again");
      startDownloadTask(imgSrc, dirName, index);
    });
  }

這樣一旦在請求階段出現異常,會自動重新發起請求。你也可以在這里自行添加重試次數上限。

下面的代碼給請求設置了一個一分鍾的超時時間:

  var startDownloadTask = function(imgSrc, dirName, index) {
    //略。。
    req.setTimeout(60 * 1000, function() {
      console.log("reqeust " + imgSrc " timeout, abort this reqeust");
      req.abort();
    })
  }

一旦在一分鍾之內下載還沒有完成,則會強制終止此次請求,這會立即觸發resend事件。

req的異常處理大致就是這些,接下來是對res的異常處理。

我們首先需要獲取包體的總長度,該值在響應頭的content-length字段中:

function getHttpReqCallback(imgSrc, dirName, index) {
  var callback = function(res) {
    var contentLength = parseInt(res.headers['content-length']);
    //略。。
  }
}

end事件的回調中,用接收到的數據總長度和響應頭中的包體長度進行比較,驗證響應信息是否接收完全:

res.on('end', function() {
  console.log("end downloading " + imgSrc);
  if (isNaN(contentLength)) {
    console.log(imgSrc + " content length error");
    return;
  }
  var totalBuff = Buffer.concat(fileBuff);
  console.log("totalBuff.length = " + totalBuff.length + " " + "contentLength = " + contentLength);
  if (totalBuff.length < contentLength) {
    console.log(imgSrc + " download error, try again");
    startDownloadTask(imgSrc, dirName, index);
    return;
  }
  fs.appendFile(dirName + "/" + fileName, totalBuff, function(err){});
}

0x6 結束

本文上面全部示例代碼如下:

(function() {
  "use strict";
  const http = require("http");
  const fs = require("fs");
  const path = require("path");

  const urlList = [
    "http://content.battlenet.com.cn/wow/media/wallpapers/patch/fall-of-the-lich-king/fall-of-the-lich-king-1920x1080.jpg",
    "http://content.battlenet.com.cn/wow/media/wallpapers/patch/black-temple/black-temple-1920x1200.jpg",
    "http://content.battlenet.com.cn/wow/media/wallpapers/patch/zandalari/zandalari-1920x1200.jpg",
    "http://content.battlenet.com.cn/wow/media/wallpapers/patch/rage-of-the-firelands/rage-of-the-firelands-1920x1200.jpg",
    "http://content.battlenet.com.cn/wow/media/wallpapers/patch/fury-of-hellfire/fury-of-hellfire-3840x2160.jpg",
  ];

  function getHttpReqCallback(imgSrc, dirName, index) {
    var fileName = index + "-" + path.basename(imgSrc);
    var callback = function(res) {
      console.log("request: " + imgSrc + " return status: " + res.statusCode);
      var contentLength = parseInt(res.headers['content-length']);
      var fileBuff = [];
      res.on('data', function (chunk) {
        var buffer = new Buffer(chunk);
        fileBuff.push(buffer);
      });
      res.on('end', function() {
        console.log("end downloading " + imgSrc);
        if (isNaN(contentLength)) {
          console.log(imgSrc + " content length error");
          return;
        }
        var totalBuff = Buffer.concat(fileBuff);
        console.log("totalBuff.length = " + totalBuff.length + " " + "contentLength = " + contentLength);
        if (totalBuff.length < contentLength) {
          console.log(imgSrc + " download error, try again");
          startDownloadTask(imgSrc, dirName, index);
          return;
        }
        fs.appendFile(dirName + "/" + fileName, totalBuff, function(err){});
      });
    };

    return callback;
  }

  var startDownloadTask = function(imgSrc, dirName, index) {
    console.log("start downloading " + imgSrc);
    var req = http.request(imgSrc, getHttpReqCallback(imgSrc, dirName, index));
    req.on('error', function(e){
      console.log("request " + imgSrc + " error, try again");
      startDownloadTask(imgSrc, dirName, index);
    });
    req.end();
  }

  urlList.forEach(function(item, index, array) {
    startDownloadTask(item, './', index);
  })
})();

本人在Nodejs方面也是完全的新手,沒有太深入的研究Nodejs內部的運行機制,只是網上讀過幾篇文章,用Nodejs寫過一些簡短的腳本,在這個過程中掉過一些坑,本文就是一次印象深刻的爬坑過程的整理和總結。總的來說,Nodejs是一個非常強大且有趣的工具,但是由於其獨特的運行模型,以及javascript自身也有不少的歷史遺留問題需要解決,所以對於長期以來習慣了java, c/c++, python一類思維方式的猿們剛剛接觸它的時候產生不少疑惑,希望本文能幫助大家理解Nodejs中的一些不同於其他語言的和運行環境的地方。




免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM