python 全棧開發,Day140(RabbitMQ,基於scrapy-redis實現分布式爬蟲)


 

一、RabbitMQ

隊列

在生產者消費模型中,比如去餐館吃飯的例子。生產者相當於廚師,隊列相當於服務員,消費者就是你。

我們必須通過服務員,才能吃飯!

如果隊列滿了,隊列會一直hold住。必須讓消費者,獲取一個,隊列才能解除hold狀態。

隊列本身就有一個鎖,保證數據安全

 

舉例:

import queue
q = queue.Queue(maxsize=10)
q.put(10)
q.put(8)
q.put(6)

print(q.get())
print(q.get())
print(q.get())
print(q.get())
View Code

執行輸出:

10
8
6

注意:此時程序並沒有結束掉!由於隊列已經空了,最后一個get操作會hold住。

 

如果不想hold住,加一個參數block=0就可以了

import queue
q = queue.Queue(maxsize=10)
q.put(10)
q.put(8)
q.put(6)

print(q.get())
print(q.get())
print(q.get())
print(q.get(block=0))
View Code

執行報錯

queue.Empty

 

這個時候,應該使用try

import queue
q = queue.Queue(maxsize=10)
q.put(10)
q.put(8)
q.put(6)

print(q.get())
print(q.get())
print(q.get())
try:
    print(q.get(block=0))
except Exception as e:
    print("raise Empty")
View Code

 

關於隊列,請參考鏈接:

https://www.cnblogs.com/yuanchenqi/articles/6755717.html#_label1

 

什么叫消息隊列

消息(Message)是指在應用間傳送的數據。消息可以非常簡單,比如只包含文本字符串,也可以更復雜,可能包含嵌入對象。

消息隊列(Message Queue)是一種應用間的通信方式,消息發送后可以立即返回,由消息系統來確保消息的可靠傳遞。消息發布者只管把消息發布到 MQ 中而不用管誰來取,消息使用者只管從 MQ 中取消息而不管是誰發布的。這樣發布者和使用者都不用知道對方的存在。

為何用消息隊列

從上面的描述中可以看出消息隊列是一種應用間的異步協作機制,那什么時候需要使用 MQ 呢?

以常見的訂單系統為例,用戶點擊【下單】按鈕之后的業務邏輯可能包括:扣減庫存、生成相應單據、發紅包、發短信通知。在業務發展初期這些邏輯可能放在一起同步執行,隨着業務的發展訂單量增長,需要提升系統服務的性能,這時可以將一些不需要立即生效的操作拆分出來異步執行,比如發放紅包、發短信通知等。這種場景下就可以用 MQ ,在下單的主流程(比如扣減庫存、生成相應單據)完成之后發送一條消息到 MQ 讓主流程快速完結,而由另外的單獨線程拉取MQ的消息(或者由 MQ 推送消息),當發現 MQ 中有發紅包或發短信之類的消息時,執行相應的業務邏輯。

詳細

RabbitMQ 

RabbitMQ 是一個由 Erlang 語言開發的 AMQP 的開源實現。

rabbitMQ是一款基於AMQP協議的消息中間件,它能夠在應用之間提供可靠的消息傳輸。在易用性,擴展性,高可用性上表現優秀。使用消息中間件利於應用之間的解耦,生產者(客戶端)無需知道消費者(服務端)的存在。而且兩端可以使用不同的語言編寫,大大提供了靈活性。

 

官方文檔:

https://www.rabbitmq.com/tutorials/tutorial-one-python.html

中文文檔:

http://rabbitmq.mr-ping.com/

 

rabbitMQ安裝

linux平台

1.安裝配置epel源
rpm -ivh http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm
 
2.安裝erlang
yum -y install erlang
 
3.安裝RabbitMQ
yum -y install rabbitmq-server


4.啟動服務
centos6:
service rabbitmq-server start

centos7:
systemctl start rabbitmq-server


5.啟動web管理插件
rabbitmq-plugins enable rabbitmq_management

6.重啟rabbitmq生效web插件
centos6:
service rabbitmq-server restart

centos7:
systemctl restart rabbitmq-server


訪問頁面:  http://ip地址:15672
# 添加賬戶
rabbitmqctl add_user admin 123456
# 設置為超級管理員
rabbitmqctl set_user_tags admin administrator
View Code

 

mac

bogon:~ yuan$ brew install rabbitmq
bogon:~ yuan$ export PATH=$PATH:/usr/local/sbin
bogon:~ yuan$ rabbitmq-server

 

windows

1.安裝erlang
雙擊運行opt_win64_21.1.exe

2.安裝rabbitmq
雙擊運行 rabbitmq-server-3.7.8

3.添加windows環境變量
Path=%ERLANG_HOME%\bin;%RABBITMQ_SERVER%\sbin

4.檢測rabbitmq狀態
rabbitmqctl status

5.啟動web管理插件
rabbitmq-plugins enable rabbitmq_management

6.登錄web管理界面,賬號密碼默認都是guest,guest
http://127.0.0.1:15672/

rabbitmq 5672  是提供客戶端連接的端口,  15672是提供web管理的端口
View Code

 

rabbitMQ工作模型

簡單模式

 

安裝pkia

pip3 install pika

 

示例

注意:本環境的RabbitMQ是安裝在Centos 7 x64系統上面的,IP地址為:192.168.142.128,默認端口5672

生產者

producer.py

import pika


# 基於socket連接中間服務器上的rabbitmq
connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.142.128'))

#  創建對象
channel = connection.channel()

# 聲明一個名為hello的隊列
channel.queue_declare(queue='hello')

# 插數據
channel.basic_publish(exchange='',   # 交換機
                      routing_key='hello', # 指定的隊列名稱
                      body='Hello Yuan!')  #

print(" [x] Sent 'Hello Yuan!'")
connection.close()
View Code

注意:在簡單模式中,是沒有交換機的。所以exchange參數的值為空

 

消費者

consumer.py

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.142.128'))
channel = connection.channel()

# 聲明一個名為hello的隊列
channel.queue_declare(queue='hello')


# 確定回調函數
def callback(ch, method, properties, body):
    print(" Received %r" % body)


channel.basic_consume(callback,
                      queue='hello',
                      no_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
View Code

 

先執行producer.py,輸出:

 [x] Sent 'Hello Yuan!'

再執行consumer.py,輸出:

 [*] Waiting for messages. To exit press CTRL+C
 Received b'Hello Yuan!'

消費者接收到了 Hello Yuan!

 

為什么消費者要聲明一個名為hello的隊列呢?生產者,不是明明已經聲明了隊列了嗎?

注意:

如果生產者先運行,那么就會創建hello隊列。那么消費者運行時,就不會創建hello隊列。這句代碼,不會執行!

channel.queue_declare(queue='hello')

如果消費者先執行,那么這里就會創建。假設沒有創建hello隊列,執行就會報錯!
其實生產者和消費者,誰來創建,都無所謂。只要保證隊列存在就可以了!

相關參數

(1) no-ack = False

如果消費者遇到情況(its channel is closed, connection is closed, or TCP connection is lost)掛掉了,那么,RabbitMQ會重新將該任務添加到隊列中。

  • 回調函數中的ch.basic_ack(delivery_tag=method.delivery_tag)
  • basic_comsume中的no_ack=False

消息接收端應該這么寫:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='10.211.55.4'))
channel = connection.channel()

channel.queue_declare(queue='hello')

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    import time
    time.sleep(10)
    print 'ok'
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(callback,
                      queue='hello',
                      no_ack=False)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
View Code

 

(2)  durable  :消息不丟失

生產者

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4'))
channel = connection.channel()

# make message persistent
channel.queue_declare(queue='hello', durable=True)

channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='Hello World!',
                      properties=pika.BasicProperties(
                          delivery_mode=2, # make message persistent
                      ))
print(" [x] Sent 'Hello World!'")
connection.close()
View Code

 

消費者

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4'))
channel = connection.channel()

# make message persistent
channel.queue_declare(queue='hello', durable=True)


def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    import time
    time.sleep(10)
    print 'ok'
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(callback,
                      queue='hello',
                      no_ack=False)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
View Code

 

(3) 消息獲取順序

默認消息隊列里的數據是按照順序被消費者拿走,例如:消費者1 去隊列中獲取 奇數 序列的任務,消費者1去隊列中獲取 偶數 序列的任務。

channel.basic_qos(prefetch_count=1) 表示誰來誰取,不再按照奇偶數排列

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4'))
channel = connection.channel()

# make message persistent
channel.queue_declare(queue='hello')


def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    import time
    time.sleep(10)
    print 'ok'
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)

channel.basic_consume(callback,
                      queue='hello',
                      no_ack=False)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
View Code

 

exchange模型

3.1 發布訂閱

發布訂閱和簡單的消息隊列區別在於,發布訂閱會將消息發送給所有的訂閱者,而消息隊列中的數據被消費一次便消失。所以,RabbitMQ實現發布和訂閱時,會為每一個訂閱者創建一個隊列,而發布者發布消息時,會將消息放置在所有相關隊列中。

關鍵參數:

exchange type = fanout

 

生產者

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',
                         type='fanout')

message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs',
                      routing_key='',
                      body=message)
print(" [x] Sent %r" % message)
connection.close()
View Code

 

消費者

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',
                         type='fanout')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

channel.queue_bind(exchange='logs',
                   queue=queue_name)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r" % body)

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()
View Code

 

3.2 關鍵字發送

關鍵參數

exchange type = direct

 

之前事例,發送消息時明確指定某個隊列並向其中發送消息,RabbitMQ還支持根據關鍵字發送,即:隊列綁定關鍵字,發送者將數據根據關鍵字發送到消息exchange,exchange根據 關鍵字 判定應該將數據發送至指定隊列。

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         type='direct')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

severities = sys.argv[1:]
if not severities:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)

for severity in severities:
    channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=severity)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()
View Code

 

3.3 模糊匹配

關鍵參數

exchange type = topic

發送者路由值              隊列中
old.boy.python          old.*  -- 不匹配
old.boy.python          old.#  -- 匹配

 

在topic類型下,可以讓隊列綁定幾個模糊的關鍵字,之后發送者將數據發送到exchange,exchange將傳入”路由值“和 ”關鍵字“進行匹配,匹配成功,則將數據發送到指定隊列。

  • # 表示可以匹配 0 個 或 多個 單詞
  • *  表示只能匹配 一個 單詞

 示例:

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
                         type='topic')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

binding_keys = sys.argv[1:]
if not binding_keys:
    sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
    sys.exit(1)

for binding_key in binding_keys:
    channel.queue_bind(exchange='topic_logs',
                       queue=queue_name,
                       routing_key=binding_key)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()
View Code

 

由於時間關系,詳細過程略...

 

本文參考鏈接:

https://www.cnblogs.com/yuanchenqi/articles/8507109.html

 

二、基於scrapy-redis實現分布式爬蟲

 Scrapy-Redis則是一個基於Redis的Scrapy分布式組件。它利用Redis對用於爬取的請求(Requests)進行存儲和調度(Schedule),並對爬取產生的項目(items)存儲以供后續處理使用。scrapy-redi重寫了scrapy一些比較關鍵的代碼,將scrapy變成一個可以在多個主機上同時運行的分布式爬蟲。

單機玩法:

 

按照正常流程就是大家都會進行重復的采集;我們都知道進程之間內存中的數據不可共享的,那么你在開啟多個Scrapy的時候,它們相互之間並不知道對方采集了些什么那些沒有沒采集。那就大家伙兒自己玩自己的了。完全沒沒有效率的提升啊!

怎么解決呢?

這就是我們Scrapy-Redis解決的問題了,不能協作不就是因為請求和去重這兩個不能共享嗎?

那我把這兩個獨立出來好了。

將Scrapy中的調度器組件獨立放到大家都能訪問的地方不就OK啦!加上scrapy,Redis的后流程圖就應該變成這樣了

分布式玩法:

 

1. redis連接

配置scrapy使用redis提供的共享去重隊列

# 在settings.py中配置鏈接Redis
REDIS_HOST = 'localhost'                            # 主機名
REDIS_PORT = 6379                                   # 端口
REDIS_URL = 'redis://user:pass@hostname:9001'       # 連接URL(優先於以上配置)
REDIS_PARAMS  = {}                                  # Redis連接參數
REDIS_PARAMS['redis_cls'] = 'myproject.RedisClient' # 指定連接Redis的Python模塊
REDIS_ENCODING = "utf-8"                            # redis編碼類型  
# 默認配置:\python3.6\Lib\site-packages\scrapy_redis\defaults.py

 

2. dupefilter

DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter"
#使用scrapy-redis提供的去重功能,查看源碼會發現是基於Redis的集合實現的


# 需要指定Redis中集合的key名,key=存放不重復Request字符串的集合
DUPEFILTER_KEY = 'dupefilter:%(timestamp)s'
#源碼:dupefilter.py內一行代碼key = defaults.DUPEFILTER_KEY % {'timestamp': int(time.time())}

 

3. Scheduler

#1、源碼:\python3.6\Lib\site-packages\scrapy_redis\scheduler.py


#2、settings.py配置

# Enables scheduling storing requests queue in redis.
SCHEDULER = "scrapy_redis.scheduler.Scheduler"       

# 調度器將不重復的任務用pickle序列化后放入共享任務隊列,默認使用優先級隊列(默認),其他:PriorityQueue(有序集合),FifoQueue(列表)、LifoQueue(列表)               
SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.PriorityQueue'          

# 對保存到redis中的request對象進行序列化,默認使用pickle
SCHEDULER_SERIALIZER = "scrapy_redis.picklecompat"   

# 調度器中請求任務序列化后存放在redis中的key               
SCHEDULER_QUEUE_KEY = '%(spider)s:requests'    

# 是否在關閉時候保留原來的調度器和去重記錄,True=保留,False=清空                     
SCHEDULER_PERSIST = True       

# 是否在開始之前清空 調度器和去重記錄,True=清空,False=不清空                                     
SCHEDULER_FLUSH_ON_START = False    

# 去調度器中獲取數據時,如果為空,最多等待時間(最后沒數據,未獲取到)。如果沒有則立刻返回會造成空循環次數過多,cpu占用率飆升                                
SCHEDULER_IDLE_BEFORE_CLOSE = 10           

# 去重規則,在redis中保存時對應的key                         
SCHEDULER_DUPEFILTER_KEY = '%(spider)s:dupefilter'      

# 去重規則對應處理的類,將任務request_fingerprint(request)得到的字符串放入去重隊列            
SCHEDULER_DUPEFILTER_CLASS = 'scrapy_redis.dupefilter.RFPDupeFilter'
View Code

 

4. RedisPipeline(持久化)

ITEM_PIPELINES = {   'scrapy_redis.pipelines.RedisPipeline': 300, }

#將item持久化到redis時,指定key和序列化函數
     
REDIS_ITEMS_KEY = '%(spider)s:items'
REDIS_ITEMS_SERIALIZER = 'json.dumps'

 

5. 從Redis中獲取起始URL

scrapy程序爬取目標站點,一旦爬取完畢后就結束了,如果目標站點更新內容了,我們想重新爬取,那么只能再重新啟動scrapy,非常麻煩
scrapy-redis提供了一種供,讓scrapy從redis中獲取起始url,如果沒有scrapy則過一段時間再來取而不會關閉
這樣我們就只需要寫一個簡單的腳本程序,定期往redis隊列里放入一個起始url。

#具體配置如下

#1、編寫爬蟲時,起始URL從redis的Key中獲取
REDIS_START_URLS_KEY = '%(name)s:start_urls'
    
#2、獲取起始URL時,去集合中獲取還是去列表中獲取?True,集合;False,列表
REDIS_START_URLS_AS_SET = False    # 獲取起始URL時,如果為True,則使用self.server.spop;如果為False,則使用self.server.lpop 

 

由於時間關系,詳細過程略...

 

本文參考鏈接:

https://www.cnblogs.com/yuanchenqi/articles/9509793.html#_label7

 

未完待續...

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM