python---twisted的使用,使用其模擬Scrapy


twisted的網絡使用

twisted的異步使用

一:簡單使用

from twisted.internet import defer
from twisted.web.client import getPage
from twisted.internet import reactor

def one_done(arg):
    print(arg)

def all_done(arg):
    print("all done")
    reactor.stop()

@defer.inlineCallbacks
def task(url):
    res = getPage(bytes(url,encoding="utf-8"))  #獲取頁面,發送http請求,是使用select池將所有socket請求保存,依據此進行計數。
    print('6',type(res))    #<class 'twisted.internet.defer.Deferred'>
    res.addCallback(one_done)   #對每一個請求都添加一個回調方法
    yield res   #返回他

start_url_list = [
    'http://www.baidu.com',
    'http://www.github.com',
]

defer_list = []
for url in start_url_list:
    v = task(url)   #發送請求后立即返回,不等待返回,v是一個特殊對象,標志你發送到那個請求
    print(v,type(v))
    defer_list.append(v)

d = defer.DeferredList(defer_list)  #將上面的特殊對象列表一起放入DeferredList
d.addBoth(all_done)  #為所有對象添加回調

reactor.run()  #會一直循環,我們需要在任務執行完畢后關閉。含有計數器,執行一個任務,會執行一次one_done,計數減一。單任務執行完畢,計數為0,執行all_done

二:模塊了解,getPage創建連接,放入select池中,進行計數,在事件循環時依據計數進行關閉(所以執行后自動關閉)

from twisted.internet import defer from twisted.web.client import getPage from twisted.internet import reactor

def one_done(arg):
    print(arg)

def all_done(arg):
    print("all done")
    reactor.stop()

@defer.inlineCallbacks
def task(url):
    res = getPage(bytes(url,encoding="utf-8"))  #獲取頁面,發送http請求,創建socket
    res.addCallback(one_done)   #對每一個請求都添加一個回調方法
    yield res   #返回他

    res = getPage(bytes(url,encoding="utf-8"))  #獲取頁面,發送http請求
    res.addCallback(one_done)   #對每一個請求都添加一個回調方法
    yield res   #返回他


start_url_list = [
    'http://www.baidu.com',
    'http://cn.bing.com',
]

defer_list = []
for url in start_url_list:
    v = task(url)   #發送請求后立即返回,不等待返回,v是一個特殊對象,標志你發送到那個請求
    print(v,type(v))
    defer_list.append(v)

d = defer.DeferredList(defer_list)  #將上面的特殊對象列表一起放入DeferredList
d.addBoth(all_done)#為所有對象添加回調

reactor.run()#會一直循環,我們需要在任務執行完畢后關閉。含有計數器,執行一個任務,會執行一次one_done,計數減一。單任務執行完畢,計數為0,執行all_done

 

 

三:Deferred創建一個特殊socket對象,不放人select池,不發送請求,需要我們自己去終止

# coding:utf8
# __author:  Administrator
# date:      2018/6/28 0028
# /usr/bin/env python
from twisted.internet import defer
from twisted.web.client import getPage
from twisted.internet import reactor


def one_done(arg):
    print(arg)


def all_done(arg):
    print("all done")
    reactor.stop()


@defer.inlineCallbacks
def task():
    url = "http://www.baidu.com"
    res = getPage(bytes(url, encoding="utf-8"))  # 獲取頁面,發送http請求
    res.addCallback(one_done)  # 對每一個請求都添加一個回調方法

    url = "http://cn.bing.com"
    res = getPage(bytes(url, encoding="utf-8"))  # 獲取頁面,發送http請求
    res.addCallback(one_done)  # 對每一個請求都添加一個回調方法

    url = "http://www.cctv.com"
    res = getPage(bytes(url, encoding="utf-8"))  # 獲取頁面,發送http請求
    res.addCallback(one_done)  # 對每一個請求都添加一個回調方法

    yield defer.Deferred()


defer_list = []
v = task()  # 發送請求后立即返回,不等待返回,v是一個特殊對象,標志你發送到那個請求
defer_list.append(v)

d = defer.DeferredList(defer_list)  # 將上面的特殊對象列表一起放入DeferredList
d.addBoth(all_done)  # 為所有對象添加回調

reactor.run()  # 會一直循環,我們需要在任務執行完畢后關閉。含有計數器,執行一個任務,會執行一次one_done,計數減一。單任務執行完畢,計數為0,執行all_done

 注意:

 會執行每個getPage的回調 ,不會執行所有請求的公共回調,所有可以在每個的回調中進行處理,讓他(將他設為全局)暫停。

count = 0
_close = None

def one_done(arg):
    print(arg)
    global count
   count += 1 if count == 3:  #callback(None)會停止Deferred對象 _close.callback(None)  #可以知道獲取的響應數據是在事件循環中才去read,task方法只是創建寫入socket

def all_done(arg):
    print("all done")
    reactor.stop()


@defer.inlineCallbacks  #協程預激活器,在裝飾器內部先使用send(None)激活協程
def task():
    url = "http://www.baidu.com"
    res = getPage(bytes(url, encoding="utf-8"))  # 獲取頁面,發送http請求
    res.addCallback(one_done)  # 對每一個請求都添加一個回調方法

    url = "http://cn.bing.com"
    res = getPage(bytes(url, encoding="utf-8"))  # 獲取頁面,發送http請求
    res.addCallback(one_done)  # 對每一個請求都添加一個回調方法

    url = "http://www.cctv.com"
    res = getPage(bytes(url, encoding="utf-8"))  # 獲取頁面,發送http請求
    res.addCallback(one_done)  # 對每一個請求都添加一個回調方法

    global _close
    _close = defer.Deferred()
    yield _close


defer_list = []
v = task()  # 發送請求后立即返回,不等待返回,v是一個特殊對象,標志你發送到那個請求
defer_list.append(v)

d = defer.DeferredList(defer_list)  # 將上面的特殊對象列表一起放入DeferredList
d.addBoth(all_done)  # 為所有對象添加回調

reactor.run()  # 會一直循環,我們需要在任務執行完畢后關閉。含有計數器,執行一個任務,會執行一次one_done,計數減一。單任務執行完畢,計數為0,執行all_done

四:簡單模擬一次請求

from twisted.internet import defer
from twisted.internet.defer import Deferred

_close = None

def one_done(arg):
print(arg)
_close.callback(None)  #將Deferred對象停止,之后才會去執行all_done,在all_done中終止掉事件循環

def all_done(arg):
print("all done")
reactor.stop()

def coro_active(func):
def inner(*args,**kwargs):
gen = func(*args,**kwargs)
deferred = Deferred()
global _close
_close = deferred
result = gen.send(None) #預激活完成,執行到yield res

if isinstance(result, Deferred):
# a deferred was yielded, get the result.
return deferred #返回一個特殊socket對象,不會發送請求,阻塞在這,在事件循環中,會被自動停止
return inner

@coro_active  #協程預激活器
def task():
url = "http://www.baidu.com"
res = getPage(bytes(url, encoding="utf-8")) # 獲取頁面,發送http請求
#內部對getPage的deferred進行了計數(是使用了隊列),當運行一個getPage就會減一self._cancellations-=1,當為0時會退出循環,但是當我們自己返回一個deferred對象,則該對象計數不會自動去銷毀,導致一直處於IO循環
res.addCallback(one_done) # 對每一個請求都添加一個回調方法

yield res


defer_list = []
v = task() # 發送請求后立即返回,不等待返回,v是一個特殊對象,標志你發送到那個請求
print(v)
defer_list.append(v)

d = defer.DeferredList(defer_list) # 將上面的特殊對象列表一起放入DeferredList
d.addBoth(all_done) # 為所有對象添加回調

reactor.run()

注意:在上面正常使中yield getPage(...),@defer.inlineCallbacks預激活器也是返回了一個Deferred對象,他是否在預激活器中使用了一樣的方法。

五:Scrapy模擬

from twisted.internet import defer
from twisted.web.client import getPage
from twisted.internet import reactor

count = 0
_close = None

def one_done(arg):
    print(arg)
    global count
    count += 1
    if count == 6:
        _close.callback(None)
        pass



def all_done(arg):
    print("all done")
    reactor.stop()

@defer.inlineCallbacks
def task(): #一個task相當於一個爬蟲 #看做是遞歸的 #第一個是start_url
    url = "http://www.baidu.com"
    res = getPage(bytes(url, encoding="utf-8"))  # 獲取頁面,發送http請求
    res.addCallback(one_done)  # 對每一個請求都添加一個回調方法

    #下面兩個可以看做parse中yield Request獲取的請求
    url = "http://cn.bing.com"
    res = getPage(bytes(url, encoding="utf-8"))  # 獲取頁面,發送http請求
    res.addCallback(one_done)  # 對每一個請求都添加一個回調方法

    url = "http://www.cctv.com"
    res = getPage(bytes(url, encoding="utf-8"))  # 獲取頁面,發送http請求
    res.addCallback(one_done)  # 對每一個請求都添加一個回調方法

    #若是任務沒有結束,Deferred對象就一直hold住事件循環,當任務結束,事件循環停止 global _close
    if not _close:
        _close = defer.Deferred()
    yield _close


spider1 = task()
spider2 = task()
#兩個爬蟲都是並發的,內部請求也是並發的

d = defer.DeferredList([spider1,spider2])  # 將上面的特殊對象列表一起放入DeferredList
d.addBoth(all_done)  # 為所有對象添加回調

reactor.run()  # 會一直循環,我們需要在任務執行完畢后關閉。含有計數器,執行一個任務,會執行一次one_done,計數減一。單任務執行完畢,計數為0,執行all_done

 六:Lower版Scrapy

from twisted.internet import defer
from twisted.internet import reactor
from twisted.web.client import getPage
import queue

#放置url,是調度器 Q = queue.Queue()

#定義的請求類,封裝請求數據
class Request:
    def __init__(self,url,callback):
        self.url = url
        self.callback = callback

#定義響應類,將獲取的數據進行封裝處理
class HttpResponse:
    def __init__(self,content,request):
        self.content = content
        self.request = request
        self.url = request.url
        self.text = str(content,encoding="utf-8")

#定義的爬蟲
class ChoutiSpider:
    name = "chouti"
    start_url = ["http://www.baidu.com","http://www.baidu.com"]

    def start_request(self):
        for url in self.start_url:
            yield Request(url,self.parse)

    def parse(self,response):
        print(response.text)
        yield Request("http://www.baidu.com",callback=self.parse)

#引擎中心
class Engine:
    def __init__(self):
        self._close = None
        self.max = 5    #最大並發數限制
        self.crawlling = []
    
    #我們不直接去調用用戶的回調函數,我們先調用自定義的回調方法,在這個方法中去調用用戶的回調,所以我們可以完善用戶的方法
    def get_response_callback(self,content,request):
        #request中含有請求,content是響應
 # 1.crawlling移除 # 2.獲取parse yield值 # 3.再次去隊列中獲取
        self.crawlling.remove(request)
        req = HttpResponse(content=content,request=request)
        result = request.callback(req) #若是回調parse中只有輸出,返回None,有yield則返回生成器
        import types
        if isinstance(result,types.GeneratorType):  #若是生成器,代表用戶又發送了請求,我們需要去迭代獲取請求,放入調度器,等待獲取
            for req in result:  #多個yield
                Q.put(req)

    def _next_request(self):
        if Q.qsize() == 0 and len(self.crawlling) == 0:
            self._close.callback(None)
            return

        if len(self.crawlling) >= self.max:
            return
        while len(self.crawlling) < self.max:
            try:
                req = Q.get(block=False)
                self.crawlling.append(req)
                d = getPage(req.url.encode("utf-8"))
                #任務下載完成,get_response_callback,調用用戶spider中定義的parse方法,並且將新請求添加到調度器
                d.addCallback(self.get_response_callback,req)
                #因為上面處理將移除一個請求,所以現在未達到最大並發數,可以再去調度器中獲取Request
                # d.addCallback(self._next_request)   #上面的addCallback可能會在向隊列中發送請求
                d.addCallback(lambda _:reactor.callLater(0,self._next_request)) #可以交給reactor調用,內部維護,防止死循環
            except Exception as e:
                return
    
    @defer.inlineCallbacks
    def crawl(self,spider):
        #開始進行下載
        #將初始Request對象添加到調度器
        start_request = iter(spider.start_request())
        while True:
            try:
                request = next(start_request)   #next激活,獲取request對象,兩步含有url和回調方法,我們將其放入調度器,等待去下載數據
                Q.put(request)
            except StopIteration as e:
                break

        #反復去調度器中去任務,並發送請求,下載數據
        # self._next_request()
        reactor.callLater(0, self._next_request)    #0秒后調用_next_request方法

        self._close = defer.Deferred()
        yield self._close

_active = set()
engine = Engine()

spider = ChoutiSpider()
d = engine.crawl(spider)    #由於返回的是Deferred對象,會阻塞在此,等待close后進行
_active.add(d)

dd = defer.DeferredList(_active)
dd.addBoth(lambda _:reactor.stop())

reactor.run()

 

 七:完善版Scrapy

from twisted.internet import defer
from twisted.internet import reactor
from twisted.web.client import getPage
from queue import Queue

#定義的請求類,封裝請求數據
class Request:
    def __init__(self,url,callback):
        self.url = url
        self.callback = callback

#定義響應類,將獲取的數據進行封裝處理
class HttpResponse:
    def __init__(self,content,request):
        self.content = content
        self.request = request
        self.url = request.url
        self.text = str(content,encoding="utf-8")


class Scheduler(object):
    """
    任務調度器
    """
    def __init__(self):
        self.q = Queue()

    def open(self):
        pass

    def next_request(self):
        try:
            req = self.q.get(block=False)
        except Exception as e:
            req = None
        return req

    def enqueue_request(self,req):
        self.q.put(req)

    def size(self):
        return self.q.qsize()


class ExecutionEngine:
    '''引擎:負責所有調度'''
    def __init__(self):
        self._close = None
        self.scheduler = None
        self.max = 5
        self.crawlling = []


    def get_response_callback(self,content,request):
        self.crawlling.remove(request)
        response = HttpResponse(content,request)
        result = request.callback(response)
        import types
        if isinstance(result,types.GeneratorType):
            for req in result:
                self.scheduler.enqueue_request(req)

    def _next_request(self):
        if self.scheduler.size() == 0 and len(self.crawlling) == 0:
            self._close.callback(None)
            return

        while len(self.crawlling) < self.max:
            req = self.scheduler.next_request()
            if not req:
                return
            self.crawlling.append(req)
            d = getPage(req.url.encode('utf-8'))
            d.addCallback(self.get_response_callback,req)
            d.addCallback(lambda _:reactor.callLater(0,self._next_request))

    @defer.inlineCallbacks
    def open_spider(self,start_requests):
        self.scheduler = Scheduler()
        yield self.scheduler.open() #yield None只是為了不報錯,因為在defer.inlineCallbacks需要當前函數是生成器
        while True:
            try:
                req = next(start_requests)
            except StopIteration as e:
                break
            self.scheduler.enqueue_request(req)
        reactor.callLater(0,self._next_request)

    @defer.inlineCallbacks
    def start(self):
        self._close = defer.Deferred()
        yield self._close

class Crawler:
    '''用戶封裝調度器以及引擎'''
    def _create_engine(self):
        return ExecutionEngine()

    def _create_spider(self,spider_cls_path):
        module_path,cls_name = spider_cls_path.rsplit(".",maxsplit=1)
        import importlib
        m = importlib.import_module(module_path)
        cls = getattr(m,cls_name)
        return cls()

    @defer.inlineCallbacks
    def crawl(self,spider_cls_path):
        engine = self._create_engine()
        print(engine)   #每個爬蟲創建一個引擎,一個調度器,去並發爬蟲中的請求
        spider = self._create_spider(spider_cls_path)
        start_requests = iter(spider.start_requests())
        yield engine.open_spider(start_requests)
        yield engine.start()

class CrawlerProcess:
    '''開啟事件循環'''
    def __init__(self):
        self._active = set()

    def crawl(self,spider_cls_path):
        crawler = Crawler()
        d = crawler.crawl(spider_cls_path)
        self._active.add(d)

    def start(self):
        dd = defer.DeferredList(self._active)
        dd.addBoth(lambda _:reactor.stop())

        reactor.run()

class Command:
    '''命令'''
    def run(self):
        crawl_process = CrawlerProcess()
        spider_cls_path_list = ['spider.chouti.ChoutiSpider','spider.baidu.BaiduSpider']
        for spider_cls_path in spider_cls_path_list:
            crawl_process.crawl(spider_cls_path)
        crawl_process.start()

if __name__ == "__main__":
    cmd = Command()
    cmd.run()

spider目錄下兩個爬蟲

from engine import Request
class ChoutiSpider(object):

    name = 'chouti'

    def start_requests(self):
        start_url = ['http://www.baidu.com','http://www.bing.com',]
        for url in start_url:
            yield Request(url,self.parse)

    def parse(self,response):
        print(response) #response是下載的頁面
        yield Request('http://www.cnblogs.com',callback=self.parse)
ChoutiSpider
from engine import Request
class CnblogsSpider(object):

    name = 'baidu'

    def start_requests(self):
        start_url = ['http://www.baidu.com',]
        for url in start_url:
            yield Request(url,self.parse)

    def parse(self,response):
        print(response) #response是下載的頁面
        yield Request('http://www.baidu.com',callback=self.parse)
BaiduSpider

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM