Django - 异步非阻塞


# 线程,进程

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

pool = ThreadPoolExecutor(10)  # 线程池
# pool = ProcessPoolExecutor(10)  # 进程池

for url in url_list:
    pool.submit(func, args)

 

 

scoket模仿浏览器向服务端发请求

import requests
import socket

requests.get(url="")

"""
socket 客户端

socket服务端+IO异步非阻塞的web框架

mysql   :3306
web     :80
"""
obj = socket()
obj.connect(("192.168.3.254", 80))  # 阻塞
obj.send(b"")
# 遵循http协议
obj.send("GET /index http1.1\r\nhost:...\r\ncontent-type:xxxx\r\n\r\n")
obj.recv(1024)  # 最多接受的字节 ,阻塞(等着消息回来)
obj.close()  # 断开连接

 

 

如何实现高性能

- gevent
- twisted
- tornado
- ayncio
    现象:一个线城实现并发请求
    本质:socket+IO多路复用

 

url_list = [
    'http://www.cnblogs.com/wupeiqi/articles/6229292.html',
    'http://www.baidu.com',
    'http://www.xiaohuar.com',
]
import requests

# 1. 穿行(6s,1个)
for url in url_list:
    response = requests.get(url)
    print(response.content)

# 2. 线程,进程(3s,三个)
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

def tast(url):
    response = requests.get(url)
    print(response.content)

pool = ThreadPoolExecutor(10)
# pool = ProcessPoolExecutor(10)
for url in url_list:
    # requests.get(url)
    pool.submit(tast, url)  # 线程池中获取一个线程,执行task函数
pool.shutdown(wait=True)

# 3. 异步非阻塞
#   【异步】,回调
#   【非阻塞】,不等,socket,
#                       阻塞: client = socket();client.connet(ip,端口)
#                       非阻塞:client = socket(); client.setblocking(False); client.connet(ip,端口)    连接;发送数据;接受数据

 

 

 

使用twisted实现高并发

from twisted.web.client import getPage, defer
from twisted.internet import reactor


def all_done(arg):
    reactor.stop()


def callback(contents):
    print(contents)


deferred_list = []

url_list = ['http://www.bing.com', 'http://www.baidu.com', ]
for url in url_list:
    deferred = getPage(bytes(url, encoding='utf8'))  # requests
    deferred.addCallback(callback)
    deferred_list.append(deferred)

dlist = defer.DeferredList(deferred_list)
dlist.addCallback(all_done)

reactor.run()
异步IO模块

import
socket import time client = socket.socket() client.setblocking(False) # 阻塞直接往下运行 try: client.connect(("61.135.169.125", 80)) # 阻塞,链接的请求已经发送 except BlockingIOError as e: print(e) time.sleep(10) # 假如在10秒内连接成功了,sendall 会成功 # 或者去执行其他的计算 # 怎么判断执行其他事的时间,连接成功通知一下回来执行 # 问题: 通知,检测 """ while true : 去数据库获取数据 # 把所有的请求都发过去 检测socket是否已连接成功 if sucess: break """ data = b"GET / http1.1\r\nHost:www.baidu.com\r\nContent-Type:text/html\r\nUser-Agent:Mozilla/5.0 (Windows NT 10.0; WOW64)
AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.87 Safari/537.36\r\n\r\n
" client.sendall(data) time.sleep(10) # 已返回内容 response = client.recv(8096) # 非阻塞,直接取数据,没有数据会报错/正常 client.close() """ IO多路复用:用来检测多个socket对象是否有变化的 while true:      # 事件循环 r,w,e=select.select([sk1,sk2,sk3],[],[],0.5) # 每隔0.5s一次 r 第一个列表里谁发生了变化,[sk1,sk3],可读 如果socket中返回内容,表示可读 for obj in r: response = obj.recv(...) print(response) w 连接成功的[sk2,],可写 for obj in w : obj.send("GET /http/1.0\...") 异步非阻塞 单线程 while循环:事件循环 知识点: client.setblocking(False) select.select检测:连接成功,数据回来了 select.select([],[],[],timeout) 可以是任何对象,但必须有fileno方法(返回系统里当前socket代表的数值) select拿的不是socket对象而是fileno返回的值 """

 

 

#!/usr/bin/env python
# -*- coding:utf-8 -*-
from twisted.web.client import getPage, defer
from twisted.internet import reactor
import queue


class Response(object):
    def __init__(self, body, request):
        self.body = body
        self.request = request
        self.url = request.url

    @property
    def text(self):
        return self.body.decode('utf-8')


class Request(object):
    def __init__(self, url, callback=None):
        self.url = url
        self.callback = callback


class Scheduler(object):
    def __init__(self, engine):
        self.q = queue.Queue()
        self.engine = engine

    def enqueue_request(self, request):
        self.q.put(request)

    def next_request(self):
        try:
            req = self.q.get(block=False)
        except Exception as e:
            req = None

        return req

    def size(self):
        return self.q.qsize()


class ExecutionEngine(object):
    def __init__(self):
        self._closewait = None
        self.running = True
        self.start_requests = None
        self.scheduler = Scheduler(self)

        self.inprogress = set()

    def check_empty(self, response):
        if not self.running:
            self._closewait.callback('......')

    def _next_request(self):
        while self.start_requests:
            try:
                request = next(self.start_requests)
            except StopIteration:
                self.start_requests = None
            else:
                self.scheduler.enqueue_request(request)

        while len(self.inprogress) < 5 and self.scheduler.size() > 0:  # 最大并发数为5

            request = self.scheduler.next_request()
            if not request:
                break

            self.inprogress.add(request)
            d = getPage(bytes(request.url, encoding='utf-8'))
            d.addBoth(self._handle_downloader_output, request)
            d.addBoth(lambda x, req: self.inprogress.remove(req), request)
            d.addBoth(lambda x: self._next_request())

        if len(self.inprogress) == 0 and self.scheduler.size() == 0:
            self._closewait.callback(None)

    def _handle_downloader_output(self, body, request):
        """
        获取内容,执行回调函数,并且把回调函数中的返回值获取,并添加到队列中
        :param response: 
        :param request: 
        :return: 
        """
        import types

        response = Response(body, request)
        func = request.callback or self.spider.parse
        gen = func(response)
        if isinstance(gen, types.GeneratorType):
            for req in gen:
                self.scheduler.enqueue_request(req)

    @defer.inlineCallbacks
    def start(self):
        self._closewait = defer.Deferred()
        yield self._closewait

    @defer.inlineCallbacks
    def open_spider(self, spider, start_requests):
        self.start_requests = start_requests
        self.spider = spider
        yield None
        reactor.callLater(0, self._next_request)


class Crawler(object):
    def __init__(self, spidercls):
        self.spidercls = spidercls

        self.spider = None
        self.engine = None

    @defer.inlineCallbacks
    def crawl(self):
        self.engine = ExecutionEngine()
        self.spider = self.spidercls()
        start_requests = iter(self.spider.start_requests())
        yield self.engine.open_spider(self.spider, start_requests)
        yield self.engine.start()


class CrawlerProcess(object):
    def __init__(self):
        self._active = set()
        self.crawlers = set()

    def crawl(self, spidercls, *args, **kwargs):
        crawler = Crawler(spidercls)
        self.crawlers.add(crawler)
        
        d = crawler.crawl(*args, **kwargs)
        self._active.add(d)
        return d

    def start(self):
        dl = defer.DeferredList(self._active)
        dl.addBoth(self._stop_reactor)
        reactor.run()

    def _stop_reactor(self, _=None):
        reactor.stop()


class Spider(object):
    def start_requests(self):
        for url in self.start_urls:
            yield Request(url)


class ChoutiSpider(Spider):
    name = "chouti"
    start_urls = [
        'http://dig.chouti.com/',
    ]

    def parse(self, response):
        print(response.text)


class CnblogsSpider(Spider):
    name = "cnblogs"
    start_urls = [
        'http://www.cnblogs.com/',
    ]

    def parse(self, response):
        print(response.text)


if __name__ == '__main__':

    spider_cls_list = [ChoutiSpider, CnblogsSpider]

    crawler_process = CrawlerProcess()
    for spider_cls in spider_cls_list:
        crawler_process.crawl(spider_cls)

    crawler_process.start()
customer_scrapy

 

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM