Memcached
Memcached 是一個高性能的分布式內存對象緩存系統,用於動態Web應用以減輕數據庫負載。它通過在內存中緩存數據和對象來減少讀取數據庫的次數,從而提高動態、數據庫驅動網站的速度。Memcached基於一個存儲鍵/值對的hashmap。其守護進程(daemon )是用C寫的,但是客戶端可以用任何語言來編寫,並通過memcached協議與守護進程通信。
Python操作Memcached
安裝API
python操作Memcached使用Python-memcached模塊
下載安裝:https://pypi.python.org/pypi/python-memcached
連接操作
import memcache mc = memcache.Client(['10.211.55.4:12000'], debug=True) mc.set("foo", "bar") ret = mc.get('foo') print ret
支持集群
python-memcached模塊原生支持集群操作,其原理是在內存維護一個主機列表,且集群中主機的權重值和主機在列表中重復出現的次數成正比
主機 權重 1.1.1.1 1 1.1.1.2 2 1.1.1.3 1 那么在內存中主機列表為: host_list = ["1.1.1.1", "1.1.1.2", "1.1.1.2", "1.1.1.3", ]
如果用戶根據如果要在內存中創建一個鍵值對(如:k1 = "v1"),那么要執行一下步驟:
- 根據算法將 k1 轉換成一個數字
- 將數字和主機列表長度求余數,得到一個值 N( 0 <= N < 列表長度 )
- 在主機列表中根據 第2步得到的值為索引獲取主機,例如:host_list[N]
- 連接 將第3步中獲取的主機,將 k1 = "v1" 放置在該服務器的內存中
add
添加一條鍵值對,如果已經存在的 key,重復執行add操作異常
#!/usr/bin/env python # -*- coding:utf-8 -*- import memcache mc = memcache.Client(['10.211.55.4:12000'], debug=True) mc.add('k1', 'v1') # mc.add('k1', 'v2') # 報錯,對已經存在的key重復添加,失敗!!!
replace
replace 修改某個key的值,如果key不存在,則異常
#!/usr/bin/env python # -*- coding:utf-8 -*- import memcache mc = memcache.Client(['10.211.55.4:12000'], debug=True) # 如果memcache中存在kkkk,則替換成功,否則一場 mc.replace('kkkk','999')
set 和 set_multi
set 設置一個鍵值對,如果key不存在,則創建,如果key存在,則修改
set_multi 設置多個鍵值對,如果key不存在,則創建,如果key存在,則修改
#!/usr/bin/env python # -*- coding:utf-8 -*- import memcache mc = memcache.Client(['10.211.55.4:12000'], debug=True) mc.set('key0', 'wupeiqi') mc.set_multi({'key1': 'val1', 'key2': 'val2'})
delete 和 delete_multi
delete 在Memcached中刪除指定的一個鍵值對
delete_multi 在Memcached中刪除指定的多個鍵值對
#!/usr/bin/env python # -*- coding:utf-8 -*- import memcache mc = memcache.Client(['10.211.55.4:12000'], debug=True) mc.delete('key0') mc.delete_multi(['key1', 'key2'])
get 和 get_multi
get 獲取一個鍵值對
get_multi 獲取多一個鍵值對
#!/usr/bin/env python # -*- coding:utf-8 -*- import memcache mc = memcache.Client(['10.211.55.4:12000'], debug=True) val = mc.get('key0') item_dict = mc.get_multi(["key1", "key2", "key3"])
append 和 prepend
append 修改指定key的值,在該值 后面 追加內容
prepend 修改指定key的值,在該值 前面 插入內容
#!/usr/bin/env python # -*- coding:utf-8 -*- import memcache mc = memcache.Client(['10.211.55.4:12000'], debug=True) # k1 = "v1" mc.append('k1', 'after') # k1 = "v1after" mc.prepend('k1', 'before') # k1 = "beforev1after"
decr 和 incr
incr 自增,將Memcached中的某一個值增加 N ( N默認為1 )
decr 自減,將Memcached中的某一個值減少 N ( N默認為1 )
#!/usr/bin/env python # -*- coding:utf-8 -*- import memcache mc = memcache.Client(['10.211.55.4:12000'], debug=True) mc.set('k1', '777') mc.incr('k1') # k1 = 778 mc.incr('k1', 10) # k1 = 788 mc.decr('k1') # k1 = 787 mc.decr('k1', 10) # k1 = 777
gets 和 cas
如商城商品剩余個數,假設改值保存在memcache中,product_count = 900
A用戶刷新頁面從memcache中讀取到product_count = 900
B用戶刷新頁面從memcache中讀取到product_count = 900
如果A、B用戶均購買商品
A用戶修改商品剩余個數 product_count=899
B用戶修改商品剩余個數 product_count=899
如此一來緩存內的數據便不在正確,兩個用戶購買商品后,商品剩余還是 899
如果使用python的set和get來操作以上過程,那么程序就會如上述所示情況!
如果想要避免此情況的發生,只要使用 gets 和 cas 即可,如:
#!/usr/bin/env python # -*- coding:utf-8 -*- import memcache mc = memcache.Client(['10.211.55.4:12000'], debug=True, cache_cas=True) v = mc.gets('product_count') # ... # 如果有人在gets之后和cas之前修改了product_count,那么,下面的設置將會執行失敗,剖出異常,從而避免非正常數據的產生 mc.cas('product_count', "899")
RabbitMQ
RabbitMQ是一個在AMQP基礎上完整的,可復用的企業消息系統。他遵循Mozilla Public License開源協議。
MQ全稱為Message Queue, 消息隊列(MQ)是一種應用程序對應用程序的通信方法。應用程序通過讀寫出入隊列的消息(針對應用程序的數據)來通信,而無需專用連接來鏈接它們。消 息傳遞指的是程序之間通過在消息中發送數據進行通信,而不是通過直接調用彼此來通信,直接調用通常是用於諸如遠程過程調用的技術。排隊指的是應用程序通過 隊列來通信。隊列的使用除去了接收和發送應用程序同時執行的要求。
安裝API
pip install pika
對於RabbitMQ來說,生產和消費不再針對內存里的一個Queue對象,而是某台服務器上的RabbitMQ Server實現的消息隊列。
#!/usr/bin/env python import pika # ######################### 生產者 ######################### connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='hello') channel.basic_publish(exchange='', routing_key='hello', body='Hello World!') print(" [x] Sent 'Hello World!'") connection.close()
#!/usr/bin/env python import pika # ########################## 消費者 ########################## connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='hello') def callback(ch, method, properties, body): print(" [x] Received %r" % body) channel.basic_consume(callback, queue='hello', no_ack=True) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
durable 消息不丟失
#生產者
#!/usr/bin/env python import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4')) channel = connection.channel() # make message persistent channel.queue_declare(queue='hello', durable=True) channel.basic_publish(exchange='', routing_key='hello', body='Hello World!', properties=pika.BasicProperties( delivery_mode=2, # make message persistent )) print(" [x] Sent 'Hello World!'") connection.close()
# 消費者 #!/usr/bin/env python # -*- coding:utf-8 -*- import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4')) channel = connection.channel() # make message persistent channel.queue_declare(queue='hello', durable=True) def callback(ch, method, properties, body): print(" [x] Received %r" % body) import time time.sleep(10) print 'ok' ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_consume(callback, queue='hello', no_ack=False) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
消息獲取順序
默認消息隊列里的數據是按照順序被消費者拿走,例如:消費者1 去隊列中獲取 奇數 序列的任務,消費者1去隊列中獲取 偶數 序列的任務。
#!/usr/bin/env python # -*- coding:utf-8 -*- import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4')) channel = connection.channel() # make message persistent channel.queue_declare(queue='hello') def callback(ch, method, properties, body): print(" [x] Received %r" % body) import time time.sleep(10) print 'ok' ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue='hello', no_ack=False) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
發布訂閱
發布訂閱和簡單的消息隊列區別在於,發布訂閱會將消息發送給所有的訂閱者,而消息隊列中的數據被消費一次便消失。所以,RabbitMQ實現發布和訂閱時,會為每一個訂閱者創建一個隊列,而發布者發布消息時,會將消息放置在所有相關隊列中。
exchange type = fanout
#生產者 #!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs', type='fanout') message = ' '.join(sys.argv[1:]) or "info: Hello World!" channel.basic_publish(exchange='logs', routing_key='', body=message) print(" [x] Sent %r" % message) connection.close()
#消費者 #!/usr/bin/env python import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs', type='fanout') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue channel.queue_bind(exchange='logs', queue=queue_name) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
關鍵字發送
exchange type = direct
之前事例,發送消息時明確指定某個隊列並向其中發送消息,RabbitMQ還支持根據關鍵字發送,即:隊列綁定關鍵字,發送者將數據根據關鍵字發送到消息exchange,exchange根據 關鍵字 判定應該將數據發送至指定隊列。
#消費者 #!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', type='direct') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue severities = sys.argv[1:] if not severities: sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) sys.exit(1) for severity in severities: channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
#生產者 #!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', type='direct') severity = sys.argv[1] if len(sys.argv) > 1 else 'info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message) print(" [x] Sent %r:%r" % (severity, message)) connection.close()
模糊匹配
exchange type = topic
在topic類型下,可以讓隊列綁定幾個模糊的關鍵字,之后發送者將數據發送到exchange,exchange將傳入”路由值“和 ”關鍵字“進行匹配,匹配成功,則將數據發送到指定隊列。
- # 表示可以匹配 0 個 或 多個 單詞
- * 表示只能匹配 一個 單詞
發送者路由值 隊列中 old.boy.python old.* -- 不匹配 old.boy.python old.# -- 匹配
#消費者 #!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='topic_logs', type='topic') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue binding_keys = sys.argv[1:] if not binding_keys: sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0]) sys.exit(1) for binding_key in binding_keys: channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=binding_key) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
#生產者 #!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='topic_logs', type='topic') routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish(exchange='topic_logs', routing_key=routing_key, body=message) print(" [x] Sent %r:%r" % (routing_key, message)) connection.close()
RabbitMQ rpc模式
#server , 生產者 #!/usr/bin/env python # -*- coding:utf-8 -*- import pika import uuid import json conn = pika.BlockingConnection(pika.ConnectionParameters(host='10.37.129.5')) channel = conn.channel() def callback(ch, method, properties, body): print(body) ch.queue_delete(method.routing_key) def send_msg(hostname, cmd): """ 向隊列中發送命令,並等待命令在客戶端執行完成后獲取結果 :param hostname: :param cmd: :return: """ # 創建臨時隊列,用於存放客戶端執行命令后的返回值 queue_name = str(uuid.uuid4()) channel.queue_declare(queue=queue_name) # 向客戶端隊列中發送命令:封裝了命令以及執行結果存放的隊列名稱 body = {'uuid': queue_name, 'content': cmd} channel.basic_publish(exchange='', routing_key=hostname, body=json.dumps(body)) # 等待客戶端想隊列中發送執行結果,超時時間10s v = channel.consume(queue_name, inactivity_timeout=10) try: for method, properties, body in v: # 執行指定回調函數 callback(channel, method, properties, body) except TypeError as e: # 如果超時,則刪除臨時隊列,不再獲取數據 channel.queue_delete(queue_name) if __name__ == '__main__': while True: hostname = input('hostname( c1.com 或 c2.com ):') cmd = input('cmd:') if cmd == 'exit': break send_msg(hostname, cmd) conn.close()
#agent 消費者 #!/usr/bin/env python # -*- coding:utf-8 -*- import pika import json import subprocess conn = pika.BlockingConnection(pika.ConnectionParameters(host='10.37.129.5')) channel = conn.channel() channel.queue_declare(queue='c2.com') def callback(ch, method, properties, body): body = json.loads(str(body, encoding='utf-8')) result = subprocess.getoutput(body['content']) result = 'c2.com:%s' % result ch.basic_publish(exchange='', routing_key=body['uuid'], body=result) channel.basic_consume(callback, queue='c2.com', no_ack=True) channel.start_consuming()
程序練習:
基於主機管理的程序把ssh換成rpc的連接方式
