消息隊列RabbitMQ、緩存數據庫Redis


 

1.RabbitMQ消息隊列

 

1.1 RabbitMQ簡介

  AMQP,即Advanced Message Queuing Protocol,高級消息隊列協議,是應用層協議的一個開放標准,為面向消息的中間件設計。消息中間件主要用於組件之間的解耦,消息的發送者無需知道消息使用者的存在,反之亦然。
  AMQP的主要特征是面向消息、隊列、路由(包括點對點和發布/訂閱)、可靠性、安全。
  RabbitMQ是一個開源的AMQP實現,服務器端用Erlang語言編寫,支持多種客戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用於在分布式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。
  下面將重點介紹RabbitMQ中的一些基礎概念,了解了這些概念,是使用好RabbitMQ的基礎。

 


 

1.2 安裝RabbitMQ和python的pika模塊

1.2.1 安裝RabbitMQ

(1)安裝erlang平台(RabbitMQ的依賴平台)

1.安裝依賴文件
yum install ncurses-devel

2.下載源文件
wget http://www.erlang.org/download/otp_src_19.1.tar.gz
若失敗,到地址:http://erlang.org/download/去手動下載

3.解壓源文件壓縮包
tar -xvf otp_src_19.1.tar.gz
(tar 參數含義: bz2格式用j;gz格式用z;c是創建;x是解壓縮;v是詳細信息;f是指定文件)

4.進入解壓后的目錄
cd otp_src_19.1

5.依次執行以下命令:
./configure -prefix=/usr/local/erlang 就會開始編譯安裝 會編譯到 /usr/local/erlang 下
make && make install

6.修改/etc/profile文件,增加下面的環境變量:
vim /etc/pofile
#set erlang environment
export PATH=$PATH:/usr/local/erlang/bin
source profile使得文件生效(用export 查看path中是否有剛剛添加的環境變量)

7.安裝完成后執行erl看是否能打開eshell,用’halt().’退出,注意:“.”是erlang的結束符

(2)安裝RabbitMQ

wget -c http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.0/rabbitmq-server-3.6.0-1.noarch.rpm

rpm -ivh --nodeps rabbitmq-server-3.6.0-1.noarch.rpm

1.2.2 安裝pika

pip install pika 或者easy_install pika

 


 

1.3 最簡單的發送/接收消息隊列模型

producer:

 1 #! /usr/bin/env python3
 2 # -*- coding:utf-8 -*-
 3 
 4 import pika
 5 
 6 connection = pika.BlockingConnection(pika.ConnectionParameters(
 7     'localhost'))   # 相當於建立一個socket連接
 8 channel = connection.channel()
 9 # 聲明queue
10 channel.queue_declare(queue='hello')
11 # RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
12 channel.basic_publish(exchange='',
13                       routing_key='hello',
14                       body='你好!'.encode("utf-8"))
15 print(" 發送 '你好!'")
16 connection.close()

consumer:

 1 #! /usr/bin/env python3
 2 # -*- coding:utf-8 -*-
 3 import pika
 4 
 5 connection = pika.BlockingConnection(pika.ConnectionParameters(
 6     'localhost'))
 7 channel = connection.channel()
 8 
 9 # You may ask why we declare the queue again ‒ we have already declared it in our previous code.
10 # We could avoid that if we were sure that the queue already exists. For example if send.py program
11 # was run before. But we're not yet sure which program to run first. In such cases it's a good
12 # practice to repeat declaring the queue in both programs.
13 channel.queue_declare(queue='hello')
14 
15 
16 def callback(ch, method, properties, body):
17     print(" 收到: %r" % body.decode("utf-8"))
18 
19 channel.basic_consume(callback,
20                       queue='hello',
21                       no_ack=True)
22 print(' 等待。。。')
23 channel.start_consuming()

注意代碼中的英文注釋,特別是為什么又一次聲明queue。。。


 

 

1.4 輪詢原理

  1.3中如果依次運行兩個consumer,分別記consumer1、consumer2,那么producer第一次發消息是consumer1收到,第二次發是consumer2收到,第三次發又是consumer1收到......也就是說,rabbitMQ是依次把消息發給consumer端。


 

1.5 消息持久化

producer

 1 #! /usr/bin/env python3
 2 # -*- coding:utf-8 -*-
 3 import pika
 4 
 5 connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))  # 相當於建立一個socket連接
 6 channel = connection.channel()  # 定義一個管道
 7 # 聲明Queue
 8 channel.queue_declare(queue="hello2",durable=True)   # durable=True 是把這個隊列持久化,如果rabbitMQ掛掉,隊列還在;如果
 9                                                       # 隊列中的消息沒有持久化,則消息會丟失
10 channel.basic_publish(exchange="",
11                       routing_key="hello2",
12                       body="Hi,how are you?",
13                       properties=pika.BasicProperties(
14                           delivery_mode=2,))      # properties=pika.BasicProperties(delivery_mode=2,) 這是隊列中的消息持久化
15 print("發送了一句話。。。")
16 connection.close()

  上述代碼中,第8行只是隊列持久化,如果rabbitMQ掛掉,隊列還在;但如果隊列中的消息沒有持久化,則消息會丟失。

 


 

1.6 消息公平分發

  如果Rabbit只管按順序把消息發到各個消費者身上,不考慮消費者負載的話,很可能出現,一個機器配置不高的消費者那里堆積了很多消息處理不完,同時配置高的消費者卻一直很輕松。為解決此問題,可以在各個消費者端,配置perfetch=1,意思就是告訴RabbitMQ在我這個消費者當前消息還沒處理完的時候就不要再給我發新消息了。

producer:

 1 import pika
 2 import sys
 3  
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(
 5         host='localhost'))
 6 channel = connection.channel()
 7  
 8 channel.queue_declare(queue='task_queue', durable=True)
 9  
10 message = ' '.join(sys.argv[1:]) or "Hello World!"
11 channel.basic_publish(exchange='',
12                       routing_key='task_queue',
13                       body=message,
14                       properties=pika.BasicProperties(
15                          delivery_mode = 2, # make message persistent
16                       ))
17 print(" [x] Sent %r" % message)
18 connection.close()

生產者端其實並沒什么變化,只是用了消息持久化。

consumer:

 1 import pika
 2 import time
 3  
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(
 5         host='localhost'))
 6 channel = connection.channel()
 7  
 8 channel.queue_declare(queue='task_queue', durable=True)
 9 print(' [*] Waiting for messages. To exit press CTRL+C')
10  
11 def callback(ch, method, properties, body):
12     print(" [x] Received %r" % body)
13     time.sleep(body.count(b'.'))
14     print(" [x] Done")
15     ch.basic_ack(delivery_tag = method.delivery_tag)
16  
17 channel.basic_qos(prefetch_count=1)
18 channel.basic_consume(callback,
19                       queue='task_queue')
20  
21 channel.start_consuming()

注意15行,必須要手動回復。


 

1.7 消息發布/訂閱(Publish/Subscribe)

 之前的例子都基本都是1對1的消息發送和接收,即消息只能發送到指定的queue里,但有些時候你想讓你的消息被所有的Queue收到,類似廣播的效果,這時候就要用到exchange了,

An exchange is a very simple thing. On one side it receives messages from producers and the other side it pushes them to queues. The exchange must know exactly what to do with a message it receives. Should it be appended to a particular queue? Should it be appended to many queues? Or should it get discarded. The rules for that are defined by the exchange type.

Exchange在定義的時候是有類型的,以決定到底是哪些Queue符合條件,可以接收消息


fanout: 所有bind到此exchange的queue都可以接收消息
direct: 通過routingKey和exchange決定的那個唯一的queue可以接收消息
topic:所有符合routingKey(此時可以是一個表達式)的routingKey所bind的queue可以接收消息

   表達式符號說明:#代表一個或多個字符,*代表任何字符
      例:#.a會匹配a.a,aa.a,aaa.a等
          *.a會匹配a.a,b.a,c.a等
     注:使用RoutingKey為#,Exchange Type為topic的時候相當於使用fanout 

headers: 通過headers 來決定把消息發給哪些queue

publisher:

 1 #! /usr/bin/env python3
 2 # -*- coding:utf-8 -*-
 3 
 4 import pika
 5 
 6 connection = pika.BlockingConnection(pika.ConnectionParameters(
 7     host='localhost'))
 8 channel = connection.channel()
 9 
10 #  不需要聲明隊列,這里是交換器
11 channel.exchange_declare(exchange='logs',
12                          type='fanout')   # fanout的意思是廣播
13 
14 message = "Info: How  are you???"
15 channel.basic_publish(exchange='logs',
16                       routing_key='',   # 沒有隊列,這里也必須寫為空
17                       body=message)
18 print(" [x] Sent %r" % message)
19 connection.close()

subscriber

 1 #! /usr/bin/env python3
 2 # -*- coding:utf-8 -*-
 3 
 4 import pika
 5 
 6 connection = pika.BlockingConnection(pika.ConnectionParameters(
 7     host='localhost'))
 8 channel = connection.channel()
 9 
10 channel.exchange_declare(exchange='logs',
11                          type='fanout')
12 
13 result = channel.queue_declare(exclusive=True)  # 不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開后,自動將queue刪除
14 # exclusive唯一的
15 queue_name = result.method.queue
16 
17 channel.queue_bind(exchange='logs',
18                    queue=queue_name)    # 將隨機分配的隊列綁定到交換機logs
19 
20 print(' [*] Waiting for logs. To exit press CTRL+C')
21 
22 
23 def callback(ch, method, properties, body):
24     print(" [x] %r" % body)
25 
26 
27 channel.basic_consume(callback,
28                       queue=queue_name,
29                       no_ack=True)
30 
31 channel.start_consuming()

 

1.8 有選擇地接收消息(exchange type = direct)

RabbitMQ還支持根據關鍵字發送,即:隊列綁定關鍵字,發送者將數據根據關鍵字發送到消息exchange,exchange根據 關鍵字 判定應該將數據發送至指定隊列。

 

publisher

 1 #! /usr/bin/env python3
 2 # -*- coding:utf-8 -*-
 3 
 4 import pika
 5 import sys
 6 
 7 connection = pika.BlockingConnection(pika.ConnectionParameters(
 8     host='localhost'))
 9 channel = connection.channel()
10 
11 channel.exchange_declare(exchange='direct_logs',
12                          type='direct')
13 
14 severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
15 message = ' '.join(sys.argv[2:]) or 'Hello World!'
16 channel.basic_publish(exchange='direct_logs',
17                       routing_key=severity,
18                       body=message)
19 print(" [x] Sent %r:%r" % (severity, message))
20 connection.close()

subscriber

 1 #! /usr/bin/env python3
 2 # -*- coding:utf-8 -*-
 3 
 4 import pika
 5 import sys
 6 
 7 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 8 channel = connection.channel()
 9 
10 channel.exchange_declare(exchange='direct_logs',type='direct')
11 
12 result = channel.queue_declare(exclusive=True)
13 queue_name = result.method.queue
14 
15 severities = sys.argv[1:]
16 if not severities:
17     sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
18     sys.exit(1)
19 
20 for severity in severities:
21     channel.queue_bind(exchange='direct_logs',
22                        queue=queue_name,
23                        routing_key=severity)
24 
25 print(' [*] Waiting for logs. To exit press CTRL+C')
26 
27 
28 def callback(ch, method, properties, body):
29     print(" [x] %r:%r" % (method.routing_key, body))
30 
31 
32 channel.basic_consume(callback,
33                       queue=queue_name,
34                       no_ack=True)
35 
36 channel.start_consuming()

 

1.9 更細致的消息過濾

import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
 
channel.exchange_declare(exchange='topic_logs',
                         type='topic')
 
routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
                      routing_key=routing_key,
                      body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()
import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
 
channel.exchange_declare(exchange='topic_logs',
                         type='topic')
 
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
 
binding_keys = sys.argv[1:]
if not binding_keys:
    sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
    sys.exit(1)
 
for binding_key in binding_keys:
    channel.queue_bind(exchange='topic_logs',
                       queue=queue_name,
                       routing_key=binding_key)
 
print(' [*] Waiting for logs. To exit press CTRL+C')
 
def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))
 
channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)
 
channel.start_consuming()

 

1.10 rpc

RPC server

 1 import pika
 2 import time
 3 connection = pika.BlockingConnection(pika.ConnectionParameters(
 4         host='localhost'))
 5  
 6 channel = connection.channel()
 7  
 8 channel.queue_declare(queue='rpc_queue')
 9  
10 def fib(n):
11     if n == 0:
12         return 0
13     elif n == 1:
14         return 1
15     else:
16         return fib(n-1) + fib(n-2)
17  
18 def on_request(ch, method, props, body):
19     n = int(body)
20  
21     print(" [.] fib(%s)" % n)
22     response = fib(n)
23  
24     ch.basic_publish(exchange='',
25                      routing_key=props.reply_to,
26                      properties=pika.BasicProperties(correlation_id = \
27                                                          props.correlation_id),
28                      body=str(response))
29     ch.basic_ack(delivery_tag = method.delivery_tag)
30  
31 channel.basic_qos(prefetch_count=1)
32 channel.basic_consume(on_request, queue='rpc_queue')
33  
34 print(" [x] Awaiting RPC requests")
35 channel.start_consuming()

RPC client

 1 import pika
 2 import uuid
 3  
 4 class FibonacciRpcClient(object):
 5     def __init__(self):
 6         self.connection = pika.BlockingConnection(pika.ConnectionParameters(
 7                 host='localhost'))
 8  
 9         self.channel = self.connection.channel()
10  
11         result = self.channel.queue_declare(exclusive=True)
12         self.callback_queue = result.method.queue
13  
14         self.channel.basic_consume(self.on_response, no_ack=True,
15                                    queue=self.callback_queue)
16  
17     def on_response(self, ch, method, props, body):
18         if self.corr_id == props.correlation_id:
19             self.response = body
20  
21     def call(self, n):
22         self.response = None
23         self.corr_id = str(uuid.uuid4())
24         self.channel.basic_publish(exchange='',
25                                    routing_key='rpc_queue',
26                                    properties=pika.BasicProperties(
27                                          reply_to = self.callback_queue,
28                                          correlation_id = self.corr_id,
29                                          ),
30                                    body=str(n))
31         while self.response is None:
32             self.connection.process_data_events()
33         return int(self.response)
34  
35 fibonacci_rpc = FibonacciRpcClient()
36  
37 print(" [x] Requesting fib(30)")
38 response = fibonacci_rpc.call(30)
39 print(" [.] Got %r" % response)

2 Redis

Redis是一個key-value存儲系統。和Memcached類似,它支持存儲的value類型相對更多,包括string(字符串)、list(鏈表)、set(集合)、zset(sorted set --有序集合)和hash(哈希類型)。這些數據類型都支持push/pop、add/remove及取交集並集和差集及更豐富的操作,而且這些操作都是原子性的。在此基礎上,redis支持各種不同方式的排序。與memcached一樣,為了保證效率,數據都是緩存在內存中。區別的是redis會周期性的把更新的數據寫入磁盤或者把修改操作寫入追加的記錄文件,並且在此基礎上實現了master-slave(主從)同步。

2.1 python操作redis

2.1.1 操作模式

redis-py提供兩個類Redis和StrictRedis用於實現Redis的命令,StrictRedis用於實現大部分官方的命令,並使用官方的語法和命令,Redis是StrictRedis的子類,用於向后兼容舊版本的redis-py。

1 import redis
2  
3 r = redis.Redis(host='10.211.55.4', port=6379)
4 r.set('foo', 'Bar')
5 print (r.get('foo'))

2.1.2 連接池

redis-py使用connection pool來管理對一個redis server的所有連接,避免每次建立、釋放連接的開銷。默認,每個Redis實例都會維護一個自己的連接池。可以直接建立一個連接池,然后作為參數Redis,這樣就可以實現多個Redis實例共享一個連接池。

import redis
 
pool = redis.ConnectionPool(host='10.211.55.4', port=6379)
 
r = redis.Redis(connection_pool=pool)
r.set('foo', 'Bar')
print(r.get('foo')) 

2.1.3 String 操作

String操作,redis中的String在在內存中按照一個name對應一個value來存儲。

name             value

n1 ----------->  v1

n2 ----------->  v2

n3 ----------->  v3

set(name, value, ex=None, px=None, nx=False, xx=False)

在Redis中設置值,默認,不存在則創建,存在則修改

參數:
      ex,過期時間(秒)
      px,過期時間(毫秒)
      nx,如果設置為True,則只有name不存在時,當前set操作才執行
      xx,如果設置為True,則只有name存在時,崗前set操作才執行

setnx(name, value)

設置值,只有name不存在時,執行設置操作(添加)

 

setex(name, value, time)

# 設置值

# 參數:

     # time,過期時間(數字秒 或 timedelta對象)
 
psetex(name, time_ms, value)
# 設置值
# 參數:
     # time_ms,過期時間(數字毫秒 或 timedelta對象)
 
mset(*args, **kwargs)
批量設置值
如:
     mset(k1= 'v1' , k2= 'v2' )
    
     mget({ 'k1' 'v1' 'k2' 'v2' })
 
get(name)   獲取值
 
mget(keys, *args)
批量獲取
如:
     mget( 'ylr' 'zingp' )
    
     r.mget([ 'ylr' 'zigp' ])
 
getset(name, value)  設置新值並獲取原來的值
 
getrange(key, start, end)
# 獲取子序列(根據字節獲取,非字符)
# 參數:
     # name,Redis 的 name
     # start,起始位置(字節)
     # end,結束位置(字節)
# 如: "劉亦菲" ,0-3表示 "劉"
 
setrange(name, offset, value)
# 修改字符串內容,從指定字符串索引開始向后替換(新值太長時,則向后添加)
# 參數:
     # offset,字符串的索引,字節(一個漢字三個字節)
     # value,要設置的值
 
setbit(name, offset, value)
# 對name對應值的二進制表示的位進行操作
 
# 參數:
    # name,redis的name
    # offset,位的索引(將值變換成二進制后再進行索引)
    # value,值只能是 1 或 0
 
# 注:如果在Redis中有一個對應: n1 = "foo",
        那么字符串foo的二進制表示為:01100110 01101111 01101111
    所以,如果執行 setbit('n1', 7, 1),則就會將第7位設置為1,
        那么最終二進制則變成 01100111 01101111 01101111,即:"goo"
 
# 擴展,轉換二進制表示:
 
    # source = "勒布朗"
    source = "foo"
 
    for i in source:
        num = ord(i)
        print bin(num).replace('b','')
 
    特別的,如果source是漢字 "勒布朗"怎么辦?
    答:對於utf-8,每一個漢字占 3 個字節,那么 "勒布朗" 則有 9個字節
       對於漢字,for循環時候會按照 字節 迭代,那么在迭代時,將每一個字節轉換 十進制數,然后再將十進制數轉換成二進制
        11100110 10101101 10100110 11100110 10110010 10011011 11101001 10111101 10010000
        -------------------------- ----------------------------- -----------------------------
                    勒                         布                           朗

getbit(name, offset)  # 獲取name對應的值的二進制表示中的某位的值 (0或1)

 

bitcount(key, start=None, end=None)

# 獲取name對應的值的二進制表示中 1 的個數

# 參數:
     # key,Redis的name
     # start,位起始位置
     # end,位結束位置
 
bitop(operation, dest, *keys)
# 獲取多個值,並將值做位運算,將最后的結果保存至新的name對應的值
 
# 參數:
     # operation,AND(並) 、 OR(或) 、 NOT(非) 、 XOR(異或)
     # dest, 新的Redis的name
     # *keys,要查找的Redis的name
# 如:
     bitop( "AND" 'new_name' 'n1' 'n2' 'n3' )
     # 獲取Redis中n1,n2,n3對應的值,然后講所有的值做位運算(求並集),然后將結果保存 new_name 對應的值中
 
strlen(name)  # 返回name對應值的字節長度(一個漢字3個字節)
 
incr(self, name, amount=1)
# 自增 name對應的值,當name不存在時,則創建name=amount,否則,則自增。
 
# 參數:
     # name,Redis的name
     # amount,自增數(必須是整數)
# 注:同incrby
 
incrbyfloat(self, name, amount=1.0)
# 自增 name對應的值,當name不存在時,則創建name=amount,否則,則自增。
 
# 參數:
     # name,Redis的name
     # amount,自增數(浮點型)
 
decr(self, name, amount=1)
# 自減 name對應的值,當name不存在時,則創建name=amount,否則,則自減。
# 參數:
     # name,Redis的name
     # amount,自減數(整數)
 
append(key, value)
# 在redis name對應的值后面追加內容
 
# 參數:
     key, redis的name
     value, 要追加的字符串
 

2.1.4 Hash操作,redis中Hash在內存中的存儲格式:

name                  hash
                       k1-->v1
n1  -----------> k2 -->v2
                       k3-->v3
hset(name, key, value)
# name對應的hash中設置一個鍵值對(不存在,則創建;否則,修改)
# 參數:
     # name,redis的name
     # key,name對應的hash中的key
     # value,name對應的hash中的value
# 注:
     # hsetnx(name, key, value),當name對應的hash中不存在當前key時則創建(相當於添加)
 
hmset(name, mapping)
# 在name對應的hash中批量設置鍵值對 
# 參數:
     # name,redis的name
     # mapping,字典,如:{'k1':'v1', 'k2': 'v2'}
# 如:
     # r.hmset('xx', {'k1':'v1', 'k2': 'v2'})
 
hget(name,key)    # 在name對應的hash中獲取根據key獲取value
 
hmget(name, keys, *args)
# 在name對應的hash中獲取多個key的值
# 參數:
     # name,reids對應的name
     # keys,要獲取key集合,如:['k1', 'k2', 'k3']
     # *args,要獲取的key,如:k1,k2,k3  
# 如:
     # r.mget('xx', ['k1', 'k2'])
     # 或
     # print r.hmget('xx', 'k1', 'k2')
 
hgetall(name)   獲取name對應 hash 的所有鍵值
 
hlen(name)  # 獲取name對應的hash中鍵值對的個數
 
hkeys(name)  # 獲取name對應的hash中所有的key的值
 
 
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM