# 线程,进程
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor pool = ThreadPoolExecutor(10) # 线程池 # pool = ProcessPoolExecutor(10) # 进程池 for url in url_list: pool.submit(func, args)
scoket模仿浏览器向服务端发请求
import requests import socket requests.get(url="") """ socket 客户端 socket服务端+IO异步非阻塞的web框架 mysql :3306 web :80 """ obj = socket() obj.connect(("192.168.3.254", 80)) # 阻塞 obj.send(b"") # 遵循http协议 obj.send("GET /index http1.1\r\nhost:...\r\ncontent-type:xxxx\r\n\r\n") obj.recv(1024) # 最多接受的字节 ,阻塞(等着消息回来) obj.close() # 断开连接
如何实现高性能
- gevent
- twisted
- tornado
- ayncio
现象:一个线城实现并发请求
本质:socket+IO多路复用
url_list = [ 'http://www.cnblogs.com/wupeiqi/articles/6229292.html', 'http://www.baidu.com', 'http://www.xiaohuar.com', ] import requests # 1. 穿行(6s,1个) for url in url_list: response = requests.get(url) print(response.content) # 2. 线程,进程(3s,三个) from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor def tast(url): response = requests.get(url) print(response.content) pool = ThreadPoolExecutor(10) # pool = ProcessPoolExecutor(10) for url in url_list: # requests.get(url) pool.submit(tast, url) # 线程池中获取一个线程,执行task函数 pool.shutdown(wait=True) # 3. 异步非阻塞 # 【异步】,回调 # 【非阻塞】,不等,socket, # 阻塞: client = socket();client.connet(ip,端口) # 非阻塞:client = socket(); client.setblocking(False); client.connet(ip,端口) 连接;发送数据;接受数据
使用twisted实现高并发
from twisted.web.client import getPage, defer from twisted.internet import reactor def all_done(arg): reactor.stop() def callback(contents): print(contents) deferred_list = [] url_list = ['http://www.bing.com', 'http://www.baidu.com', ] for url in url_list: deferred = getPage(bytes(url, encoding='utf8')) # requests deferred.addCallback(callback) deferred_list.append(deferred) dlist = defer.DeferredList(deferred_list) dlist.addCallback(all_done) reactor.run()
异步IO模块
import socket import time client = socket.socket() client.setblocking(False) # 阻塞直接往下运行 try: client.connect(("61.135.169.125", 80)) # 阻塞,链接的请求已经发送 except BlockingIOError as e: print(e) time.sleep(10) # 假如在10秒内连接成功了,sendall 会成功 # 或者去执行其他的计算 # 怎么判断执行其他事的时间,连接成功通知一下回来执行 # 问题: 通知,检测 """ while true : 去数据库获取数据 # 把所有的请求都发过去 检测socket是否已连接成功 if sucess: break """ data = b"GET / http1.1\r\nHost:www.baidu.com\r\nContent-Type:text/html\r\nUser-Agent:Mozilla/5.0 (Windows NT 10.0; WOW64)
AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.87 Safari/537.36\r\n\r\n" client.sendall(data) time.sleep(10) # 已返回内容 response = client.recv(8096) # 非阻塞,直接取数据,没有数据会报错/正常 client.close() """ IO多路复用:用来检测多个socket对象是否有变化的 while true: # 事件循环 r,w,e=select.select([sk1,sk2,sk3],[],[],0.5) # 每隔0.5s一次 r 第一个列表里谁发生了变化,[sk1,sk3],可读 如果socket中返回内容,表示可读 for obj in r: response = obj.recv(...) print(response) w 连接成功的[sk2,],可写 for obj in w : obj.send("GET /http/1.0\...") 异步非阻塞 单线程 while循环:事件循环 知识点: client.setblocking(False) select.select检测:连接成功,数据回来了 select.select([],[],[],timeout) 可以是任何对象,但必须有fileno方法(返回系统里当前socket代表的数值) select拿的不是socket对象而是fileno返回的值 """

#!/usr/bin/env python # -*- coding:utf-8 -*- from twisted.web.client import getPage, defer from twisted.internet import reactor import queue class Response(object): def __init__(self, body, request): self.body = body self.request = request self.url = request.url @property def text(self): return self.body.decode('utf-8') class Request(object): def __init__(self, url, callback=None): self.url = url self.callback = callback class Scheduler(object): def __init__(self, engine): self.q = queue.Queue() self.engine = engine def enqueue_request(self, request): self.q.put(request) def next_request(self): try: req = self.q.get(block=False) except Exception as e: req = None return req def size(self): return self.q.qsize() class ExecutionEngine(object): def __init__(self): self._closewait = None self.running = True self.start_requests = None self.scheduler = Scheduler(self) self.inprogress = set() def check_empty(self, response): if not self.running: self._closewait.callback('......') def _next_request(self): while self.start_requests: try: request = next(self.start_requests) except StopIteration: self.start_requests = None else: self.scheduler.enqueue_request(request) while len(self.inprogress) < 5 and self.scheduler.size() > 0: # 最大并发数为5 request = self.scheduler.next_request() if not request: break self.inprogress.add(request) d = getPage(bytes(request.url, encoding='utf-8')) d.addBoth(self._handle_downloader_output, request) d.addBoth(lambda x, req: self.inprogress.remove(req), request) d.addBoth(lambda x: self._next_request()) if len(self.inprogress) == 0 and self.scheduler.size() == 0: self._closewait.callback(None) def _handle_downloader_output(self, body, request): """ 获取内容,执行回调函数,并且把回调函数中的返回值获取,并添加到队列中 :param response: :param request: :return: """ import types response = Response(body, request) func = request.callback or self.spider.parse gen = func(response) if isinstance(gen, types.GeneratorType): for req in gen: self.scheduler.enqueue_request(req) @defer.inlineCallbacks def start(self): self._closewait = defer.Deferred() yield self._closewait @defer.inlineCallbacks def open_spider(self, spider, start_requests): self.start_requests = start_requests self.spider = spider yield None reactor.callLater(0, self._next_request) class Crawler(object): def __init__(self, spidercls): self.spidercls = spidercls self.spider = None self.engine = None @defer.inlineCallbacks def crawl(self): self.engine = ExecutionEngine() self.spider = self.spidercls() start_requests = iter(self.spider.start_requests()) yield self.engine.open_spider(self.spider, start_requests) yield self.engine.start() class CrawlerProcess(object): def __init__(self): self._active = set() self.crawlers = set() def crawl(self, spidercls, *args, **kwargs): crawler = Crawler(spidercls) self.crawlers.add(crawler) d = crawler.crawl(*args, **kwargs) self._active.add(d) return d def start(self): dl = defer.DeferredList(self._active) dl.addBoth(self._stop_reactor) reactor.run() def _stop_reactor(self, _=None): reactor.stop() class Spider(object): def start_requests(self): for url in self.start_urls: yield Request(url) class ChoutiSpider(Spider): name = "chouti" start_urls = [ 'http://dig.chouti.com/', ] def parse(self, response): print(response.text) class CnblogsSpider(Spider): name = "cnblogs" start_urls = [ 'http://www.cnblogs.com/', ] def parse(self, response): print(response.text) if __name__ == '__main__': spider_cls_list = [ChoutiSpider, CnblogsSpider] crawler_process = CrawlerProcess() for spider_cls in spider_cls_list: crawler_process.crawl(spider_cls) crawler_process.start()