一:簡單使用
from twisted.internet import defer from twisted.web.client import getPage from twisted.internet import reactor def one_done(arg): print(arg) def all_done(arg): print("all done") reactor.stop() @defer.inlineCallbacks def task(url): res = getPage(bytes(url,encoding="utf-8")) #獲取頁面,發送http請求,是使用select池將所有socket請求保存,依據此進行計數。 print('6',type(res)) #<class 'twisted.internet.defer.Deferred'> res.addCallback(one_done) #對每一個請求都添加一個回調方法 yield res #返回他 start_url_list = [ 'http://www.baidu.com', 'http://www.github.com', ] defer_list = [] for url in start_url_list: v = task(url) #發送請求后立即返回,不等待返回,v是一個特殊對象,標志你發送到那個請求 print(v,type(v)) defer_list.append(v) d = defer.DeferredList(defer_list) #將上面的特殊對象列表一起放入DeferredList d.addBoth(all_done) #為所有對象添加回調 reactor.run() #會一直循環,我們需要在任務執行完畢后關閉。含有計數器,執行一個任務,會執行一次one_done,計數減一。單任務執行完畢,計數為0,執行all_done
二:模塊了解,getPage創建連接,放入select池中,進行計數,在事件循環時依據計數進行關閉(所以執行后自動關閉)
from twisted.internet import defer from twisted.web.client import getPage from twisted.internet import reactor def one_done(arg): print(arg) def all_done(arg): print("all done") reactor.stop() @defer.inlineCallbacks def task(url): res = getPage(bytes(url,encoding="utf-8")) #獲取頁面,發送http請求,創建socket res.addCallback(one_done) #對每一個請求都添加一個回調方法 yield res #返回他 res = getPage(bytes(url,encoding="utf-8")) #獲取頁面,發送http請求 res.addCallback(one_done) #對每一個請求都添加一個回調方法 yield res #返回他 start_url_list = [ 'http://www.baidu.com', 'http://cn.bing.com', ] defer_list = [] for url in start_url_list: v = task(url) #發送請求后立即返回,不等待返回,v是一個特殊對象,標志你發送到那個請求 print(v,type(v)) defer_list.append(v) d = defer.DeferredList(defer_list) #將上面的特殊對象列表一起放入DeferredList d.addBoth(all_done)#為所有對象添加回調 reactor.run()#會一直循環,我們需要在任務執行完畢后關閉。含有計數器,執行一個任務,會執行一次one_done,計數減一。單任務執行完畢,計數為0,執行all_done
三:Deferred創建一個特殊socket對象,不放人select池,不發送請求,需要我們自己去終止
# coding:utf8 # __author: Administrator # date: 2018/6/28 0028 # /usr/bin/env python from twisted.internet import defer from twisted.web.client import getPage from twisted.internet import reactor def one_done(arg): print(arg) def all_done(arg): print("all done") reactor.stop() @defer.inlineCallbacks def task(): url = "http://www.baidu.com" res = getPage(bytes(url, encoding="utf-8")) # 獲取頁面,發送http請求 res.addCallback(one_done) # 對每一個請求都添加一個回調方法 url = "http://cn.bing.com" res = getPage(bytes(url, encoding="utf-8")) # 獲取頁面,發送http請求 res.addCallback(one_done) # 對每一個請求都添加一個回調方法 url = "http://www.cctv.com" res = getPage(bytes(url, encoding="utf-8")) # 獲取頁面,發送http請求 res.addCallback(one_done) # 對每一個請求都添加一個回調方法 yield defer.Deferred() defer_list = [] v = task() # 發送請求后立即返回,不等待返回,v是一個特殊對象,標志你發送到那個請求 defer_list.append(v) d = defer.DeferredList(defer_list) # 將上面的特殊對象列表一起放入DeferredList d.addBoth(all_done) # 為所有對象添加回調 reactor.run() # 會一直循環,我們需要在任務執行完畢后關閉。含有計數器,執行一個任務,會執行一次one_done,計數減一。單任務執行完畢,計數為0,執行all_done
注意:
會執行每個getPage的回調 ,不會執行所有請求的公共回調,所有可以在每個的回調中進行處理,讓他(將他設為全局)暫停。
count = 0 _close = None def one_done(arg): print(arg) global count count += 1 if count == 3: #callback(None)會停止Deferred對象 _close.callback(None) #可以知道獲取的響應數據是在事件循環中才去read,task方法只是創建寫入socket def all_done(arg): print("all done") reactor.stop() @defer.inlineCallbacks #協程預激活器,在裝飾器內部先使用send(None)激活協程 def task(): url = "http://www.baidu.com" res = getPage(bytes(url, encoding="utf-8")) # 獲取頁面,發送http請求 res.addCallback(one_done) # 對每一個請求都添加一個回調方法 url = "http://cn.bing.com" res = getPage(bytes(url, encoding="utf-8")) # 獲取頁面,發送http請求 res.addCallback(one_done) # 對每一個請求都添加一個回調方法 url = "http://www.cctv.com" res = getPage(bytes(url, encoding="utf-8")) # 獲取頁面,發送http請求 res.addCallback(one_done) # 對每一個請求都添加一個回調方法 global _close _close = defer.Deferred() yield _close defer_list = [] v = task() # 發送請求后立即返回,不等待返回,v是一個特殊對象,標志你發送到那個請求 defer_list.append(v) d = defer.DeferredList(defer_list) # 將上面的特殊對象列表一起放入DeferredList d.addBoth(all_done) # 為所有對象添加回調 reactor.run() # 會一直循環,我們需要在任務執行完畢后關閉。含有計數器,執行一個任務,會執行一次one_done,計數減一。單任務執行完畢,計數為0,執行all_done
四:簡單模擬一次請求
from twisted.internet import defer
from twisted.internet.defer import Deferred
_close = None
def one_done(arg):
print(arg)
_close.callback(None) #將Deferred對象停止,之后才會去執行all_done,在all_done中終止掉事件循環
def all_done(arg):
print("all done")
reactor.stop()
def coro_active(func):
def inner(*args,**kwargs):
gen = func(*args,**kwargs)
deferred = Deferred()
global _close
_close = deferred
result = gen.send(None) #預激活完成,執行到yield res
if isinstance(result, Deferred):
# a deferred was yielded, get the result.
return deferred #返回一個特殊socket對象,不會發送請求,阻塞在這,在事件循環中,會被自動停止
return inner
@coro_active #協程預激活器
def task():
url = "http://www.baidu.com"
res = getPage(bytes(url, encoding="utf-8")) # 獲取頁面,發送http請求
#內部對getPage的deferred進行了計數(是使用了隊列),當運行一個getPage就會減一self._cancellations-=1,當為0時會退出循環,但是當我們自己返回一個deferred對象,則該對象計數不會自動去銷毀,導致一直處於IO循環
res.addCallback(one_done) # 對每一個請求都添加一個回調方法
yield res
defer_list = []
v = task() # 發送請求后立即返回,不等待返回,v是一個特殊對象,標志你發送到那個請求
print(v)
defer_list.append(v)
d = defer.DeferredList(defer_list) # 將上面的特殊對象列表一起放入DeferredList
d.addBoth(all_done) # 為所有對象添加回調
reactor.run()
注意:在上面正常使中yield getPage(...),@defer.inlineCallbacks預激活器也是返回了一個Deferred對象,他是否在預激活器中使用了一樣的方法。
五:Scrapy模擬
from twisted.internet import defer from twisted.web.client import getPage from twisted.internet import reactor count = 0 _close = None def one_done(arg): print(arg) global count count += 1 if count == 6: _close.callback(None) pass def all_done(arg): print("all done") reactor.stop() @defer.inlineCallbacks def task(): #一個task相當於一個爬蟲 #看做是遞歸的 #第一個是start_url url = "http://www.baidu.com" res = getPage(bytes(url, encoding="utf-8")) # 獲取頁面,發送http請求 res.addCallback(one_done) # 對每一個請求都添加一個回調方法 #下面兩個可以看做parse中yield Request獲取的請求 url = "http://cn.bing.com" res = getPage(bytes(url, encoding="utf-8")) # 獲取頁面,發送http請求 res.addCallback(one_done) # 對每一個請求都添加一個回調方法 url = "http://www.cctv.com" res = getPage(bytes(url, encoding="utf-8")) # 獲取頁面,發送http請求 res.addCallback(one_done) # 對每一個請求都添加一個回調方法 #若是任務沒有結束,Deferred對象就一直hold住事件循環,當任務結束,事件循環停止 global _close if not _close: _close = defer.Deferred() yield _close spider1 = task() spider2 = task() #兩個爬蟲都是並發的,內部請求也是並發的 d = defer.DeferredList([spider1,spider2]) # 將上面的特殊對象列表一起放入DeferredList d.addBoth(all_done) # 為所有對象添加回調 reactor.run() # 會一直循環,我們需要在任務執行完畢后關閉。含有計數器,執行一個任務,會執行一次one_done,計數減一。單任務執行完畢,計數為0,執行all_done
六:Lower版Scrapy
from twisted.internet import defer from twisted.internet import reactor from twisted.web.client import getPage import queue #放置url,是調度器 Q = queue.Queue() #定義的請求類,封裝請求數據 class Request: def __init__(self,url,callback): self.url = url self.callback = callback #定義響應類,將獲取的數據進行封裝處理 class HttpResponse: def __init__(self,content,request): self.content = content self.request = request self.url = request.url self.text = str(content,encoding="utf-8") #定義的爬蟲 class ChoutiSpider: name = "chouti" start_url = ["http://www.baidu.com","http://www.baidu.com"] def start_request(self): for url in self.start_url: yield Request(url,self.parse) def parse(self,response): print(response.text) yield Request("http://www.baidu.com",callback=self.parse) #引擎中心 class Engine: def __init__(self): self._close = None self.max = 5 #最大並發數限制 self.crawlling = [] #我們不直接去調用用戶的回調函數,我們先調用自定義的回調方法,在這個方法中去調用用戶的回調,所以我們可以完善用戶的方法 def get_response_callback(self,content,request): #request中含有請求,content是響應 # 1.crawlling移除 # 2.獲取parse yield值 # 3.再次去隊列中獲取 self.crawlling.remove(request) req = HttpResponse(content=content,request=request) result = request.callback(req) #若是回調parse中只有輸出,返回None,有yield則返回生成器 import types if isinstance(result,types.GeneratorType): #若是生成器,代表用戶又發送了請求,我們需要去迭代獲取請求,放入調度器,等待獲取 for req in result: #多個yield Q.put(req) def _next_request(self): if Q.qsize() == 0 and len(self.crawlling) == 0: self._close.callback(None) return if len(self.crawlling) >= self.max: return while len(self.crawlling) < self.max: try: req = Q.get(block=False) self.crawlling.append(req) d = getPage(req.url.encode("utf-8")) #任務下載完成,get_response_callback,調用用戶spider中定義的parse方法,並且將新請求添加到調度器 d.addCallback(self.get_response_callback,req) #因為上面處理將移除一個請求,所以現在未達到最大並發數,可以再去調度器中獲取Request # d.addCallback(self._next_request) #上面的addCallback可能會在向隊列中發送請求 d.addCallback(lambda _:reactor.callLater(0,self._next_request)) #可以交給reactor調用,內部維護,防止死循環 except Exception as e: return @defer.inlineCallbacks def crawl(self,spider): #開始進行下載 #將初始Request對象添加到調度器 start_request = iter(spider.start_request()) while True: try: request = next(start_request) #next激活,獲取request對象,兩步含有url和回調方法,我們將其放入調度器,等待去下載數據 Q.put(request) except StopIteration as e: break #反復去調度器中去任務,並發送請求,下載數據 # self._next_request() reactor.callLater(0, self._next_request) #0秒后調用_next_request方法 self._close = defer.Deferred() yield self._close _active = set() engine = Engine() spider = ChoutiSpider() d = engine.crawl(spider) #由於返回的是Deferred對象,會阻塞在此,等待close后進行 _active.add(d) dd = defer.DeferredList(_active) dd.addBoth(lambda _:reactor.stop()) reactor.run()
七:完善版Scrapy
from twisted.internet import defer from twisted.internet import reactor from twisted.web.client import getPage from queue import Queue #定義的請求類,封裝請求數據 class Request: def __init__(self,url,callback): self.url = url self.callback = callback #定義響應類,將獲取的數據進行封裝處理 class HttpResponse: def __init__(self,content,request): self.content = content self.request = request self.url = request.url self.text = str(content,encoding="utf-8") class Scheduler(object): """ 任務調度器 """ def __init__(self): self.q = Queue() def open(self): pass def next_request(self): try: req = self.q.get(block=False) except Exception as e: req = None return req def enqueue_request(self,req): self.q.put(req) def size(self): return self.q.qsize() class ExecutionEngine: '''引擎:負責所有調度''' def __init__(self): self._close = None self.scheduler = None self.max = 5 self.crawlling = [] def get_response_callback(self,content,request): self.crawlling.remove(request) response = HttpResponse(content,request) result = request.callback(response) import types if isinstance(result,types.GeneratorType): for req in result: self.scheduler.enqueue_request(req) def _next_request(self): if self.scheduler.size() == 0 and len(self.crawlling) == 0: self._close.callback(None) return while len(self.crawlling) < self.max: req = self.scheduler.next_request() if not req: return self.crawlling.append(req) d = getPage(req.url.encode('utf-8')) d.addCallback(self.get_response_callback,req) d.addCallback(lambda _:reactor.callLater(0,self._next_request)) @defer.inlineCallbacks def open_spider(self,start_requests): self.scheduler = Scheduler() yield self.scheduler.open() #yield None只是為了不報錯,因為在defer.inlineCallbacks需要當前函數是生成器 while True: try: req = next(start_requests) except StopIteration as e: break self.scheduler.enqueue_request(req) reactor.callLater(0,self._next_request) @defer.inlineCallbacks def start(self): self._close = defer.Deferred() yield self._close class Crawler: '''用戶封裝調度器以及引擎''' def _create_engine(self): return ExecutionEngine() def _create_spider(self,spider_cls_path): module_path,cls_name = spider_cls_path.rsplit(".",maxsplit=1) import importlib m = importlib.import_module(module_path) cls = getattr(m,cls_name) return cls() @defer.inlineCallbacks def crawl(self,spider_cls_path): engine = self._create_engine() print(engine) #每個爬蟲創建一個引擎,一個調度器,去並發爬蟲中的請求 spider = self._create_spider(spider_cls_path) start_requests = iter(spider.start_requests()) yield engine.open_spider(start_requests) yield engine.start() class CrawlerProcess: '''開啟事件循環''' def __init__(self): self._active = set() def crawl(self,spider_cls_path): crawler = Crawler() d = crawler.crawl(spider_cls_path) self._active.add(d) def start(self): dd = defer.DeferredList(self._active) dd.addBoth(lambda _:reactor.stop()) reactor.run() class Command: '''命令''' def run(self): crawl_process = CrawlerProcess() spider_cls_path_list = ['spider.chouti.ChoutiSpider','spider.baidu.BaiduSpider'] for spider_cls_path in spider_cls_path_list: crawl_process.crawl(spider_cls_path) crawl_process.start() if __name__ == "__main__": cmd = Command() cmd.run()
spider目錄下兩個爬蟲

from engine import Request class ChoutiSpider(object): name = 'chouti' def start_requests(self): start_url = ['http://www.baidu.com','http://www.bing.com',] for url in start_url: yield Request(url,self.parse) def parse(self,response): print(response) #response是下載的頁面 yield Request('http://www.cnblogs.com',callback=self.parse)

from engine import Request class CnblogsSpider(object): name = 'baidu' def start_requests(self): start_url = ['http://www.baidu.com',] for url in start_url: yield Request(url,self.parse) def parse(self,response): print(response) #response是下載的頁面 yield Request('http://www.baidu.com',callback=self.parse)