一、RabbitMQ
隊列
在生產者消費模型中,比如去餐館吃飯的例子。生產者相當於廚師,隊列相當於服務員,消費者就是你。
我們必須通過服務員,才能吃飯!
如果隊列滿了,隊列會一直hold住。必須讓消費者,獲取一個,隊列才能解除hold狀態。
隊列本身就有一個鎖,保證數據安全
舉例:

import queue q = queue.Queue(maxsize=10) q.put(10) q.put(8) q.put(6) print(q.get()) print(q.get()) print(q.get()) print(q.get())
執行輸出:
10 8 6
注意:此時程序並沒有結束掉!由於隊列已經空了,最后一個get操作會hold住。
如果不想hold住,加一個參數block=0就可以了

import queue q = queue.Queue(maxsize=10) q.put(10) q.put(8) q.put(6) print(q.get()) print(q.get()) print(q.get()) print(q.get(block=0))
執行報錯
queue.Empty
這個時候,應該使用try

import queue q = queue.Queue(maxsize=10) q.put(10) q.put(8) q.put(6) print(q.get()) print(q.get()) print(q.get()) try: print(q.get(block=0)) except Exception as e: print("raise Empty")
關於隊列,請參考鏈接:
https://www.cnblogs.com/yuanchenqi/articles/6755717.html#_label1
什么叫消息隊列
消息(Message)是指在應用間傳送的數據。消息可以非常簡單,比如只包含文本字符串,也可以更復雜,可能包含嵌入對象。
消息隊列(Message Queue)是一種應用間的通信方式,消息發送后可以立即返回,由消息系統來確保消息的可靠傳遞。消息發布者只管把消息發布到 MQ 中而不用管誰來取,消息使用者只管從 MQ 中取消息而不管是誰發布的。這樣發布者和使用者都不用知道對方的存在。
為何用消息隊列
從上面的描述中可以看出消息隊列是一種應用間的異步協作機制,那什么時候需要使用 MQ 呢?
以常見的訂單系統為例,用戶點擊【下單】按鈕之后的業務邏輯可能包括:扣減庫存、生成相應單據、發紅包、發短信通知。在業務發展初期這些邏輯可能放在一起同步執行,隨着業務的發展訂單量增長,需要提升系統服務的性能,這時可以將一些不需要立即生效的操作拆分出來異步執行,比如發放紅包、發短信通知等。這種場景下就可以用 MQ ,在下單的主流程(比如扣減庫存、生成相應單據)完成之后發送一條消息到 MQ 讓主流程快速完結,而由另外的單獨線程拉取MQ的消息(或者由 MQ 推送消息),當發現 MQ 中有發紅包或發短信之類的消息時,執行相應的業務邏輯。
RabbitMQ
RabbitMQ 是一個由 Erlang 語言開發的 AMQP 的開源實現。
rabbitMQ是一款基於AMQP協議的消息中間件,它能夠在應用之間提供可靠的消息傳輸。在易用性,擴展性,高可用性上表現優秀。使用消息中間件利於應用之間的解耦,生產者(客戶端)無需知道消費者(服務端)的存在。而且兩端可以使用不同的語言編寫,大大提供了靈活性。
官方文檔:
https://www.rabbitmq.com/tutorials/tutorial-one-python.html
中文文檔:
rabbitMQ安裝
linux平台

1.安裝配置epel源 rpm -ivh http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm 2.安裝erlang yum -y install erlang 3.安裝RabbitMQ yum -y install rabbitmq-server 4.啟動服務 centos6: service rabbitmq-server start centos7: systemctl start rabbitmq-server 5.啟動web管理插件 rabbitmq-plugins enable rabbitmq_management 6.重啟rabbitmq生效web插件 centos6: service rabbitmq-server restart centos7: systemctl restart rabbitmq-server 訪問頁面: http://ip地址:15672 # 添加賬戶 rabbitmqctl add_user admin 123456 # 設置為超級管理員 rabbitmqctl set_user_tags admin administrator
mac
bogon:~ yuan$ brew install rabbitmq bogon:~ yuan$ export PATH=$PATH:/usr/local/sbin bogon:~ yuan$ rabbitmq-server
windows

1.安裝erlang 雙擊運行opt_win64_21.1.exe 2.安裝rabbitmq 雙擊運行 rabbitmq-server-3.7.8 3.添加windows環境變量 Path=%ERLANG_HOME%\bin;%RABBITMQ_SERVER%\sbin 4.檢測rabbitmq狀態 rabbitmqctl status 5.啟動web管理插件 rabbitmq-plugins enable rabbitmq_management 6.登錄web管理界面,賬號密碼默認都是guest,guest http://127.0.0.1:15672/ rabbitmq 5672 是提供客戶端連接的端口, 15672是提供web管理的端口
rabbitMQ工作模型
簡單模式
安裝pkia
pip3 install pika
示例
注意:本環境的RabbitMQ是安裝在Centos 7 x64系統上面的,IP地址為:192.168.142.128,默認端口5672
生產者
producer.py

import pika # 基於socket連接中間服務器上的rabbitmq connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.142.128')) # 創建對象 channel = connection.channel() # 聲明一個名為hello的隊列 channel.queue_declare(queue='hello') # 插數據 channel.basic_publish(exchange='', # 交換機 routing_key='hello', # 指定的隊列名稱 body='Hello Yuan!') # 值 print(" [x] Sent 'Hello Yuan!'") connection.close()
注意:在簡單模式中,是沒有交換機的。所以exchange參數的值為空
消費者
consumer.py

import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.142.128')) channel = connection.channel() # 聲明一個名為hello的隊列 channel.queue_declare(queue='hello') # 確定回調函數 def callback(ch, method, properties, body): print(" Received %r" % body) channel.basic_consume(callback, queue='hello', no_ack=True) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
先執行producer.py,輸出:
[x] Sent 'Hello Yuan!'
再執行consumer.py,輸出:
[*] Waiting for messages. To exit press CTRL+C Received b'Hello Yuan!'
消費者接收到了 Hello Yuan!
為什么消費者要聲明一個名為hello的隊列呢?生產者,不是明明已經聲明了隊列了嗎?
注意:
如果生產者先運行,那么就會創建hello隊列。那么消費者運行時,就不會創建hello隊列。這句代碼,不會執行!
channel.queue_declare(queue='hello')
如果消費者先執行,那么這里就會創建。假設沒有創建hello隊列,執行就會報錯!
其實生產者和消費者,誰來創建,都無所謂。只要保證隊列存在就可以了!
相關參數
(1) no-ack = False
如果消費者遇到情況(its channel is closed, connection is closed, or TCP connection is lost)掛掉了,那么,RabbitMQ會重新將該任務添加到隊列中。
- 回調函數中的
ch.basic_ack(delivery_tag=method.delivery_tag)
- basic_comsume中的
no_ack=False
消息接收端應該這么寫:

import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host='10.211.55.4')) channel = connection.channel() channel.queue_declare(queue='hello') def callback(ch, method, properties, body): print(" [x] Received %r" % body) import time time.sleep(10) print 'ok' ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_consume(callback, queue='hello', no_ack=False) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
(2) durable :消息不丟失
生產者

import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4')) channel = connection.channel() # make message persistent channel.queue_declare(queue='hello', durable=True) channel.basic_publish(exchange='', routing_key='hello', body='Hello World!', properties=pika.BasicProperties( delivery_mode=2, # make message persistent )) print(" [x] Sent 'Hello World!'") connection.close()
消費者

import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4')) channel = connection.channel() # make message persistent channel.queue_declare(queue='hello', durable=True) def callback(ch, method, properties, body): print(" [x] Received %r" % body) import time time.sleep(10) print 'ok' ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_consume(callback, queue='hello', no_ack=False) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
(3) 消息獲取順序
默認消息隊列里的數據是按照順序被消費者拿走,例如:消費者1 去隊列中獲取 奇數 序列的任務,消費者1去隊列中獲取 偶數 序列的任務。
channel.basic_qos(prefetch_count=1) 表示誰來誰取,不再按照奇偶數排列

import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4')) channel = connection.channel() # make message persistent channel.queue_declare(queue='hello') def callback(ch, method, properties, body): print(" [x] Received %r" % body) import time time.sleep(10) print 'ok' ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue='hello', no_ack=False) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
exchange模型
3.1 發布訂閱
發布訂閱和簡單的消息隊列區別在於,發布訂閱會將消息發送給所有的訂閱者,而消息隊列中的數據被消費一次便消失。所以,RabbitMQ實現發布和訂閱時,會為每一個訂閱者創建一個隊列,而發布者發布消息時,會將消息放置在所有相關隊列中。
關鍵參數:
exchange type = fanout
生產者

import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs', type='fanout') message = ' '.join(sys.argv[1:]) or "info: Hello World!" channel.basic_publish(exchange='logs', routing_key='', body=message) print(" [x] Sent %r" % message) connection.close()
消費者

import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs', type='fanout') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue channel.queue_bind(exchange='logs', queue=queue_name) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
3.2 關鍵字發送
關鍵參數
exchange type = direct
之前事例,發送消息時明確指定某個隊列並向其中發送消息,RabbitMQ還支持根據關鍵字發送,即:隊列綁定關鍵字,發送者將數據根據關鍵字發送到消息exchange,exchange根據 關鍵字 判定應該將數據發送至指定隊列。

import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', type='direct') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue severities = sys.argv[1:] if not severities: sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) sys.exit(1) for severity in severities: channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
3.3 模糊匹配
關鍵參數
exchange type = topic 發送者路由值 隊列中 old.boy.python old.* -- 不匹配 old.boy.python old.# -- 匹配
在topic類型下,可以讓隊列綁定幾個模糊的關鍵字,之后發送者將數據發送到exchange,exchange將傳入”路由值“和 ”關鍵字“進行匹配,匹配成功,則將數據發送到指定隊列。
- # 表示可以匹配 0 個 或 多個 單詞
- * 表示只能匹配 一個 單詞
示例:

import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='topic_logs', type='topic') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue binding_keys = sys.argv[1:] if not binding_keys: sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0]) sys.exit(1) for binding_key in binding_keys: channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=binding_key) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
由於時間關系,詳細過程略...
本文參考鏈接:
https://www.cnblogs.com/yuanchenqi/articles/8507109.html
二、基於scrapy-redis實現分布式爬蟲
Scrapy-Redis則是一個基於Redis的Scrapy分布式組件。它利用Redis對用於爬取的請求(Requests)進行存儲和調度(Schedule),並對爬取產生的項目(items)存儲以供后續處理使用。scrapy-redi重寫了scrapy一些比較關鍵的代碼,將scrapy變成一個可以在多個主機上同時運行的分布式爬蟲。
單機玩法:
按照正常流程就是大家都會進行重復的采集;我們都知道進程之間內存中的數據不可共享的,那么你在開啟多個Scrapy的時候,它們相互之間並不知道對方采集了些什么那些沒有沒采集。那就大家伙兒自己玩自己的了。完全沒沒有效率的提升啊!
怎么解決呢?
這就是我們Scrapy-Redis解決的問題了,不能協作不就是因為請求和去重這兩個不能共享嗎?
那我把這兩個獨立出來好了。
將Scrapy中的調度器組件獨立放到大家都能訪問的地方不就OK啦!加上scrapy,Redis的后流程圖就應該變成這樣了
分布式玩法:
1. redis連接
配置scrapy使用redis提供的共享去重隊列
# 在settings.py中配置鏈接Redis REDIS_HOST = 'localhost' # 主機名 REDIS_PORT = 6379 # 端口 REDIS_URL = 'redis://user:pass@hostname:9001' # 連接URL(優先於以上配置) REDIS_PARAMS = {} # Redis連接參數 REDIS_PARAMS['redis_cls'] = 'myproject.RedisClient' # 指定連接Redis的Python模塊 REDIS_ENCODING = "utf-8" # redis編碼類型 # 默認配置:\python3.6\Lib\site-packages\scrapy_redis\defaults.py
2. dupefilter
DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter" #使用scrapy-redis提供的去重功能,查看源碼會發現是基於Redis的集合實現的 # 需要指定Redis中集合的key名,key=存放不重復Request字符串的集合 DUPEFILTER_KEY = 'dupefilter:%(timestamp)s' #源碼:dupefilter.py內一行代碼key = defaults.DUPEFILTER_KEY % {'timestamp': int(time.time())}
3. Scheduler

#1、源碼:\python3.6\Lib\site-packages\scrapy_redis\scheduler.py #2、settings.py配置 # Enables scheduling storing requests queue in redis. SCHEDULER = "scrapy_redis.scheduler.Scheduler" # 調度器將不重復的任務用pickle序列化后放入共享任務隊列,默認使用優先級隊列(默認),其他:PriorityQueue(有序集合),FifoQueue(列表)、LifoQueue(列表) SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.PriorityQueue' # 對保存到redis中的request對象進行序列化,默認使用pickle SCHEDULER_SERIALIZER = "scrapy_redis.picklecompat" # 調度器中請求任務序列化后存放在redis中的key SCHEDULER_QUEUE_KEY = '%(spider)s:requests' # 是否在關閉時候保留原來的調度器和去重記錄,True=保留,False=清空 SCHEDULER_PERSIST = True # 是否在開始之前清空 調度器和去重記錄,True=清空,False=不清空 SCHEDULER_FLUSH_ON_START = False # 去調度器中獲取數據時,如果為空,最多等待時間(最后沒數據,未獲取到)。如果沒有則立刻返回會造成空循環次數過多,cpu占用率飆升 SCHEDULER_IDLE_BEFORE_CLOSE = 10 # 去重規則,在redis中保存時對應的key SCHEDULER_DUPEFILTER_KEY = '%(spider)s:dupefilter' # 去重規則對應處理的類,將任務request_fingerprint(request)得到的字符串放入去重隊列 SCHEDULER_DUPEFILTER_CLASS = 'scrapy_redis.dupefilter.RFPDupeFilter'
4. RedisPipeline(持久化)
ITEM_PIPELINES = { 'scrapy_redis.pipelines.RedisPipeline': 300, } #將item持久化到redis時,指定key和序列化函數 REDIS_ITEMS_KEY = '%(spider)s:items' REDIS_ITEMS_SERIALIZER = 'json.dumps'
5. 從Redis中獲取起始URL
scrapy程序爬取目標站點,一旦爬取完畢后就結束了,如果目標站點更新內容了,我們想重新爬取,那么只能再重新啟動scrapy,非常麻煩 scrapy-redis提供了一種供,讓scrapy從redis中獲取起始url,如果沒有scrapy則過一段時間再來取而不會關閉 這樣我們就只需要寫一個簡單的腳本程序,定期往redis隊列里放入一個起始url。 #具體配置如下 #1、編寫爬蟲時,起始URL從redis的Key中獲取 REDIS_START_URLS_KEY = '%(name)s:start_urls' #2、獲取起始URL時,去集合中獲取還是去列表中獲取?True,集合;False,列表 REDIS_START_URLS_AS_SET = False # 獲取起始URL時,如果為True,則使用self.server.spop;如果為False,則使用self.server.lpop
由於時間關系,詳細過程略...
本文參考鏈接:
https://www.cnblogs.com/yuanchenqi/articles/9509793.html#_label7
未完待續...