# 線程,進程
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor pool = ThreadPoolExecutor(10) # 線程池 # pool = ProcessPoolExecutor(10) # 進程池 for url in url_list: pool.submit(func, args)
scoket模仿瀏覽器向服務端發請求
import requests import socket requests.get(url="") """ socket 客戶端 socket服務端+IO異步非阻塞的web框架 mysql :3306 web :80 """ obj = socket() obj.connect(("192.168.3.254", 80)) # 阻塞 obj.send(b"") # 遵循http協議 obj.send("GET /index http1.1\r\nhost:...\r\ncontent-type:xxxx\r\n\r\n") obj.recv(1024) # 最多接受的字節 ,阻塞(等着消息回來) obj.close() # 斷開連接
如何實現高性能
- gevent
- twisted
- tornado
- ayncio
現象:一個線城實現並發請求
本質:socket+IO多路復用
url_list = [ 'http://www.cnblogs.com/wupeiqi/articles/6229292.html', 'http://www.baidu.com', 'http://www.xiaohuar.com', ] import requests # 1. 穿行(6s,1個) for url in url_list: response = requests.get(url) print(response.content) # 2. 線程,進程(3s,三個) from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor def tast(url): response = requests.get(url) print(response.content) pool = ThreadPoolExecutor(10) # pool = ProcessPoolExecutor(10) for url in url_list: # requests.get(url) pool.submit(tast, url) # 線程池中獲取一個線程,執行task函數 pool.shutdown(wait=True) # 3. 異步非阻塞 # 【異步】,回調 # 【非阻塞】,不等,socket, # 阻塞: client = socket();client.connet(ip,端口) # 非阻塞:client = socket(); client.setblocking(False); client.connet(ip,端口) 連接;發送數據;接受數據
使用twisted實現高並發
from twisted.web.client import getPage, defer from twisted.internet import reactor def all_done(arg): reactor.stop() def callback(contents): print(contents) deferred_list = [] url_list = ['http://www.bing.com', 'http://www.baidu.com', ] for url in url_list: deferred = getPage(bytes(url, encoding='utf8')) # requests deferred.addCallback(callback) deferred_list.append(deferred) dlist = defer.DeferredList(deferred_list) dlist.addCallback(all_done) reactor.run()
異步IO模塊
import socket import time client = socket.socket() client.setblocking(False) # 阻塞直接往下運行 try: client.connect(("61.135.169.125", 80)) # 阻塞,鏈接的請求已經發送 except BlockingIOError as e: print(e) time.sleep(10) # 假如在10秒內連接成功了,sendall 會成功 # 或者去執行其他的計算 # 怎么判斷執行其他事的時間,連接成功通知一下回來執行 # 問題: 通知,檢測 """ while true : 去數據庫獲取數據 # 把所有的請求都發過去 檢測socket是否已連接成功 if sucess: break """ data = b"GET / http1.1\r\nHost:www.baidu.com\r\nContent-Type:text/html\r\nUser-Agent:Mozilla/5.0 (Windows NT 10.0; WOW64)
AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.87 Safari/537.36\r\n\r\n" client.sendall(data) time.sleep(10) # 已返回內容 response = client.recv(8096) # 非阻塞,直接取數據,沒有數據會報錯/正常 client.close() """ IO多路復用:用來檢測多個socket對象是否有變化的 while true: # 事件循環 r,w,e=select.select([sk1,sk2,sk3],[],[],0.5) # 每隔0.5s一次 r 第一個列表里誰發生了變化,[sk1,sk3],可讀 如果socket中返回內容,表示可讀 for obj in r: response = obj.recv(...) print(response) w 連接成功的[sk2,],可寫 for obj in w : obj.send("GET /http/1.0\...") 異步非阻塞 單線程 while循環:事件循環 知識點: client.setblocking(False) select.select檢測:連接成功,數據回來了 select.select([],[],[],timeout) 可以是任何對象,但必須有fileno方法(返回系統里當前socket代表的數值) select拿的不是socket對象而是fileno返回的值 """

#!/usr/bin/env python # -*- coding:utf-8 -*- from twisted.web.client import getPage, defer from twisted.internet import reactor import queue class Response(object): def __init__(self, body, request): self.body = body self.request = request self.url = request.url @property def text(self): return self.body.decode('utf-8') class Request(object): def __init__(self, url, callback=None): self.url = url self.callback = callback class Scheduler(object): def __init__(self, engine): self.q = queue.Queue() self.engine = engine def enqueue_request(self, request): self.q.put(request) def next_request(self): try: req = self.q.get(block=False) except Exception as e: req = None return req def size(self): return self.q.qsize() class ExecutionEngine(object): def __init__(self): self._closewait = None self.running = True self.start_requests = None self.scheduler = Scheduler(self) self.inprogress = set() def check_empty(self, response): if not self.running: self._closewait.callback('......') def _next_request(self): while self.start_requests: try: request = next(self.start_requests) except StopIteration: self.start_requests = None else: self.scheduler.enqueue_request(request) while len(self.inprogress) < 5 and self.scheduler.size() > 0: # 最大並發數為5 request = self.scheduler.next_request() if not request: break self.inprogress.add(request) d = getPage(bytes(request.url, encoding='utf-8')) d.addBoth(self._handle_downloader_output, request) d.addBoth(lambda x, req: self.inprogress.remove(req), request) d.addBoth(lambda x: self._next_request()) if len(self.inprogress) == 0 and self.scheduler.size() == 0: self._closewait.callback(None) def _handle_downloader_output(self, body, request): """ 獲取內容,執行回調函數,並且把回調函數中的返回值獲取,並添加到隊列中 :param response: :param request: :return: """ import types response = Response(body, request) func = request.callback or self.spider.parse gen = func(response) if isinstance(gen, types.GeneratorType): for req in gen: self.scheduler.enqueue_request(req) @defer.inlineCallbacks def start(self): self._closewait = defer.Deferred() yield self._closewait @defer.inlineCallbacks def open_spider(self, spider, start_requests): self.start_requests = start_requests self.spider = spider yield None reactor.callLater(0, self._next_request) class Crawler(object): def __init__(self, spidercls): self.spidercls = spidercls self.spider = None self.engine = None @defer.inlineCallbacks def crawl(self): self.engine = ExecutionEngine() self.spider = self.spidercls() start_requests = iter(self.spider.start_requests()) yield self.engine.open_spider(self.spider, start_requests) yield self.engine.start() class CrawlerProcess(object): def __init__(self): self._active = set() self.crawlers = set() def crawl(self, spidercls, *args, **kwargs): crawler = Crawler(spidercls) self.crawlers.add(crawler) d = crawler.crawl(*args, **kwargs) self._active.add(d) return d def start(self): dl = defer.DeferredList(self._active) dl.addBoth(self._stop_reactor) reactor.run() def _stop_reactor(self, _=None): reactor.stop() class Spider(object): def start_requests(self): for url in self.start_urls: yield Request(url) class ChoutiSpider(Spider): name = "chouti" start_urls = [ 'http://dig.chouti.com/', ] def parse(self, response): print(response.text) class CnblogsSpider(Spider): name = "cnblogs" start_urls = [ 'http://www.cnblogs.com/', ] def parse(self, response): print(response.text) if __name__ == '__main__': spider_cls_list = [ChoutiSpider, CnblogsSpider] crawler_process = CrawlerProcess() for spider_cls in spider_cls_list: crawler_process.crawl(spider_cls) crawler_process.start()