python 緩存


Memcached

Memcached 是一個高性能的分布式內存對象緩存系統,用於動態Web應用以減輕數據庫負載。它通過在內存中緩存數據和對象來減少讀取數據庫的次數,從而提高動態、數據庫驅動網站的速度。Memcached基於一個存儲鍵/值對的hashmap。其守護進程(daemon )是用C寫的,但是客戶端可以用任何語言來編寫,並通過memcached協議與守護進程通信。

Python操作Memcached

安裝API

python操作Memcached使用Python-memcached模塊

下載安裝:https://pypi.python.org/pypi/python-memcached

連接操作

import memcache
 
mc = memcache.Client(['10.211.55.4:12000'], debug=True)
mc.set("foo", "bar")
ret = mc.get('foo')
print ret

支持集群

python-memcached模塊原生支持集群操作,其原理是在內存維護一個主機列表,且集群中主機的權重值和主機在列表中重復出現的次數成正比

     主機    權重
    1.1.1.1   1
    1.1.1.2   2
    1.1.1.3   1
 
那么在內存中主機列表為:
    host_list = ["1.1.1.1", "1.1.1.2", "1.1.1.2", "1.1.1.3", ]

如果用戶根據如果要在內存中創建一個鍵值對(如:k1 = "v1"),那么要執行一下步驟:

  • 根據算法將 k1 轉換成一個數字
  • 將數字和主機列表長度求余數,得到一個值 N( 0 <= N < 列表長度 )
  • 在主機列表中根據 第2步得到的值為索引獲取主機,例如:host_list[N]
  • 連接 將第3步中獲取的主機,將 k1 = "v1" 放置在該服務器的內存中

add
添加一條鍵值對,如果已經存在的 key,重復執行add操作異常

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import memcache
 
mc = memcache.Client(['10.211.55.4:12000'], debug=True)
mc.add('k1', 'v1')
# mc.add('k1', 'v2') # 報錯,對已經存在的key重復添加,失敗!!!

replace
replace 修改某個key的值,如果key不存在,則異常

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import memcache
 
mc = memcache.Client(['10.211.55.4:12000'], debug=True)
# 如果memcache中存在kkkk,則替換成功,否則一場
mc.replace('kkkk','999')

set set_multi

set            設置一個鍵值對,如果key不存在,則創建,如果key存在,則修改
set_multi   設置多個鍵值對,如果key不存在,則創建,如果key存在,則修改

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import memcache
 
mc = memcache.Client(['10.211.55.4:12000'], debug=True)
 
mc.set('key0', 'wupeiqi')
 
mc.set_multi({'key1': 'val1', 'key2': 'val2'})

delete delete_multi

delete             在Memcached中刪除指定的一個鍵值對
delete_multi    在Memcached中刪除指定的多個鍵值對

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import memcache
 
mc = memcache.Client(['10.211.55.4:12000'], debug=True)
 
mc.delete('key0')
mc.delete_multi(['key1', 'key2'])

get get_multi

get            獲取一個鍵值對
get_multi   獲取多一個鍵值對

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import memcache
 
mc = memcache.Client(['10.211.55.4:12000'], debug=True)
 
val = mc.get('key0')
item_dict = mc.get_multi(["key1", "key2", "key3"])

append prepend

append    修改指定key的值,在該值 后面 追加內容
prepend   修改指定key的值,在該值 前面 插入內容

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import memcache
 
mc = memcache.Client(['10.211.55.4:12000'], debug=True)
# k1 = "v1"
 
mc.append('k1', 'after')
# k1 = "v1after"
 
mc.prepend('k1', 'before')
# k1 = "beforev1after"

decr incr  

incr  自增,將Memcached中的某一個值增加 N ( N默認為1 )
decr 自減,將Memcached中的某一個值減少 N ( N默認為1 )

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import memcache
 
mc = memcache.Client(['10.211.55.4:12000'], debug=True)
mc.set('k1', '777')
 
mc.incr('k1')
# k1 = 778
 
mc.incr('k1', 10)
# k1 = 788
 
mc.decr('k1')
# k1 = 787
 
mc.decr('k1', 10)
# k1 = 777

gets  cas

如商城商品剩余個數,假設改值保存在memcache中,product_count = 900
A用戶刷新頁面從memcache中讀取到product_count = 900
B用戶刷新頁面從memcache中讀取到product_count = 900

如果A、B用戶均購買商品

A用戶修改商品剩余個數 product_count=899
B用戶修改商品剩余個數 product_count=899

如此一來緩存內的數據便不在正確,兩個用戶購買商品后,商品剩余還是 899
如果使用python的set和get來操作以上過程,那么程序就會如上述所示情況!

如果想要避免此情況的發生,只要使用 gets 和 cas 即可,如:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import memcache
mc = memcache.Client(['10.211.55.4:12000'], debug=True, cache_cas=True)
 
v = mc.gets('product_count')
# ...
# 如果有人在gets之后和cas之前修改了product_count,那么,下面的設置將會執行失敗,剖出異常,從而避免非正常數據的產生
mc.cas('product_count', "899")

 

 

RabbitMQ

RabbitMQ是一個在AMQP基礎上完整的,可復用的企業消息系統。他遵循Mozilla Public License開源協議。

MQ全稱為Message Queue, 消息隊列(MQ)是一種應用程序對應用程序的通信方法。應用程序通過讀寫出入隊列的消息(針對應用程序的數據)來通信,而無需專用連接來鏈接它們。消 息傳遞指的是程序之間通過在消息中發送數據進行通信,而不是通過直接調用彼此來通信,直接調用通常是用於諸如遠程過程調用的技術。排隊指的是應用程序通過 隊列來通信。隊列的使用除去了接收和發送應用程序同時執行的要求。

安裝API

pip install pika

對於RabbitMQ來說,生產和消費不再針對內存里的一個Queue對象,而是某台服務器上的RabbitMQ Server實現的消息隊列。

#!/usr/bin/env python
import pika
 
# ######################### 生產者 #########################
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
 
channel.queue_declare(queue='hello')
 
channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()
#!/usr/bin/env python
import pika
 
# ########################## 消費者 ##########################
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
 
channel.queue_declare(queue='hello')
 
def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
 
channel.basic_consume(callback,
                      queue='hello',
                      no_ack=True)
 
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

durable   消息不丟失

#生產者
#
!/usr/bin/env python import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4')) channel = connection.channel() # make message persistent channel.queue_declare(queue='hello', durable=True) channel.basic_publish(exchange='', routing_key='hello', body='Hello World!', properties=pika.BasicProperties( delivery_mode=2, # make message persistent )) print(" [x] Sent 'Hello World!'") connection.close()
# 消費者
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4'))
channel = connection.channel()

# make message persistent
channel.queue_declare(queue='hello', durable=True)


def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    import time
    time.sleep(10)
    print 'ok'
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(callback,
                      queue='hello',
                      no_ack=False)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

消息獲取順序

默認消息隊列里的數據是按照順序被消費者拿走,例如:消費者1 去隊列中獲取 奇數 序列的任務,消費者1去隊列中獲取 偶數 序列的任務。

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4'))
channel = connection.channel()

# make message persistent
channel.queue_declare(queue='hello')


def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    import time
    time.sleep(10)
    print 'ok'
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)

channel.basic_consume(callback,
                      queue='hello',
                      no_ack=False)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

發布訂閱

發布訂閱和簡單的消息隊列區別在於,發布訂閱會將消息發送給所有的訂閱者,而消息隊列中的數據被消費一次便消失。所以,RabbitMQ實現發布和訂閱時,會為每一個訂閱者創建一個隊列,而發布者發布消息時,會將消息放置在所有相關隊列中。

 exchange type = fanout

#生產者
#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',
                         type='fanout')

message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs',
                      routing_key='',
                      body=message)
print(" [x] Sent %r" % message)
connection.close()
#消費者
#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',
                         type='fanout')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

channel.queue_bind(exchange='logs',
                   queue=queue_name)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r" % body)

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()

關鍵字發送

 exchange type = direct

之前事例,發送消息時明確指定某個隊列並向其中發送消息,RabbitMQ還支持根據關鍵字發送,即:隊列綁定關鍵字,發送者將數據根據關鍵字發送到消息exchange,exchange根據 關鍵字 判定應該將數據發送至指定隊列。

#消費者
#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         type='direct')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

severities = sys.argv[1:]
if not severities:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)

for severity in severities:
    channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=severity)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()
#生產者
#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         type='direct')

severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()

模糊匹配

 exchange type = topic

在topic類型下,可以讓隊列綁定幾個模糊的關鍵字,之后發送者將數據發送到exchange,exchange將傳入”路由值“和 ”關鍵字“進行匹配,匹配成功,則將數據發送到指定隊列。

  • # 表示可以匹配 0 個 或 多個 單詞
  • *  表示只能匹配 一個 單詞
發送者路由值              隊列中
old.boy.python          old.*  -- 不匹配
old.boy.python          old.#  -- 匹配
#消費者
#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
                         type='topic')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

binding_keys = sys.argv[1:]
if not binding_keys:
    sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
    sys.exit(1)

for binding_key in binding_keys:
    channel.queue_bind(exchange='topic_logs',
                       queue=queue_name,
                       routing_key=binding_key)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()
#生產者
#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
                         type='topic')

routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
                      routing_key=routing_key,
                      body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()

RabbitMQ rpc模式

#server , 生產者
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pika
import uuid
import json

conn = pika.BlockingConnection(pika.ConnectionParameters(host='10.37.129.5'))
channel = conn.channel()


def callback(ch, method, properties, body):
    print(body)
    ch.queue_delete(method.routing_key)


def send_msg(hostname, cmd):
    """
    向隊列中發送命令,並等待命令在客戶端執行完成后獲取結果
    :param hostname:
    :param cmd:
    :return:
    """

    # 創建臨時隊列,用於存放客戶端執行命令后的返回值
    queue_name = str(uuid.uuid4())
    channel.queue_declare(queue=queue_name)

    # 向客戶端隊列中發送命令:封裝了命令以及執行結果存放的隊列名稱
    body = {'uuid': queue_name, 'content': cmd}
    channel.basic_publish(exchange='', routing_key=hostname, body=json.dumps(body))

    # 等待客戶端想隊列中發送執行結果,超時時間10s
    v = channel.consume(queue_name, inactivity_timeout=10)
    try:
        for method, properties, body in v:
            # 執行指定回調函數
            callback(channel, method, properties, body)
    except TypeError as e:
        # 如果超時,則刪除臨時隊列,不再獲取數據
        channel.queue_delete(queue_name)


if __name__ == '__main__':
    while True:

        hostname = input('hostname( c1.com 或 c2.com ):')
        cmd = input('cmd:')
        if cmd == 'exit':
            break
        send_msg(hostname, cmd)
    conn.close()

 

#agent 消費者
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pika
import json
import subprocess

conn = pika.BlockingConnection(pika.ConnectionParameters(host='10.37.129.5'))
channel = conn.channel()

channel.queue_declare(queue='c2.com')


def callback(ch, method, properties, body):
    body = json.loads(str(body, encoding='utf-8'))
    result = subprocess.getoutput(body['content'])
    result = 'c2.com:%s' % result
    ch.basic_publish(exchange='', routing_key=body['uuid'], body=result)


channel.basic_consume(callback, queue='c2.com', no_ack=True)

channel.start_consuming()

 

程序練習:

基於主機管理的程序把ssh換成rpc的連接方式

github:https://github.com/wangyufu/host_manage_rpc


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM