select server
#!/usr/bin/env python
# -*- coding:utf-8 -*-
#_*_coding:utf-8_*_
__author__ = 'Alex Li'
import select
import socket
import sys
import queue
# Create a TCP/IP socket
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setblocking(False)
# Bind the socket to the port
server_address = ('localhost', 10000)
print(sys.stderr, 'starting up on %s port %s' % server_address)
server.bind(server_address)
# Listen for incoming connections
server.listen(5)
# Sockets from which we expect to read
inputs = [ server ]
# Sockets to which we expect to write
outputs = [ ]
message_queues = {}
while inputs:
# Wait for at least one of the sockets to be ready for processing
print( '\nwaiting for the next event')
readable, writable, exceptional = select.select(inputs, outputs, inputs,2)
# Handle inputs
for s in readable:
if s is server: #new connection
# A "readable" server socket is ready to accept a connection
connection, client_address = s.accept()
print('new connection from', client_address)
connection.setblocking(False)
inputs.append(connection)
# Give the connection a queue for data we want to send
message_queues[connection] = queue.Queue()
else:
data = s.recv(1024)
if data:
# A readable client socket has data
print(sys.stderr, 'received "%s" from %s' % (data, s.getpeername()) )
message_queues[s].put(data)
# Add output channel for response
if s not in outputs:
outputs.append(s)
else:
# Interpret empty result as closed connection
print('closing', client_address, 'after reading no data')
# Stop listening for input on the connection
if s in outputs:
outputs.remove(s) #既然客戶端都斷開了,我就不用再給它返回數據了,所以這時候如果這個客戶端的連接對象還在outputs列表中,就把它刪掉
inputs.remove(s) #inputs中也刪除掉
s.close() #把這個連接關閉掉
# Remove message queue
del message_queues[s]
# Handle outputs
for s in writable:
try:
next_msg = message_queues[s].get_nowait()
except queue.Empty:
# No messages waiting so stop checking for writability.
print('output queue for', s.getpeername(), 'is empty')
outputs.remove(s)
else:
print( 'sending "%s" to %s' % (next_msg, s.getpeername()))
s.send(next_msg)
# Handle "exceptional conditions"
for s in exceptional:
print('handling exceptional condition for', s.getpeername() )
# Stop listening for input on the connection
inputs.remove(s)
if s in outputs:
outputs.remove(s)
s.close()
# Remove message queue
del message_queues[s]
select client
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import socket
import sys
messages = [ 'This is the message. ',
'It will be sent ',
'in parts.',
]
server_address = ('localhost', 10000)
# Create a TCP/IP socket
socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM),
socket.socket(socket.AF_INET, socket.SOCK_STREAM),
socket.socket(socket.AF_INET, socket.SOCK_STREAM),
socket.socket(socket.AF_INET, socket.SOCK_STREAM),
]
# Connect the socket to the port where the server is listening
print >>sys.stderr, 'connecting to %s port %s' % server_address
for s in socks:
s.connect(server_address)
for message in messages:
# Send messages on both sockets
for s in socks:
print >>sys.stderr, '%s: sending "%s"' % (s.getsockname(), message)
s.send(message)
# Read responses on both sockets
for s in socks:
data = s.recv(1024)
print >>sys.stderr, '%s: received "%s"' % (s.getsockname(), data)
if not data:
print >>sys.stderr, 'closing socket', s.getsockname()
s.close()
###########################################################################################
http://www.cnblogs.com/alex3714/articles/5248247.html
事件驅動
簡而言之,事件驅動分為二個部分:第一,注冊事件;第二,觸發事件。 如果想使用某一個事件驅動框架,得按照它定義的規則,自己寫事件,注冊,然后觸發。
自定義事件驅動框架,命名為:“qiang”:
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# event_drive.py
event_list = []
def run():
for event in event_list:
obj = event()
obj.execute()
class BaseHandler(object):
"""
用戶必須繼承該類,從而規范所有類的方法(類似於接口的功能)
"""
def execute(self):
raise Exception('you must overwrite execute')
最牛逼的事件驅動框架
程序員使用“qiang”:
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from source import event_drive
class MyHandler(event_drive.BaseHandler):
def execute(self):
print 'event-drive execute MyHandler'
event_drive.event_list.append(MyHandler)
event_drive.run()
論事件驅動與異步IO
事件驅動編程是一種編程范式,這里程序的執行流由外部事件來決定。它的特點是包含一個事件循環,當外部事件發生時使用回調機制來觸發相應的處理。另外兩種常見的編程范式是(單線程)同步以及多線程編程。
讓我們用例子來比較和對比一下單線程、多線程以及事件驅動編程模型。下圖展示了隨着時間的推移,這三種模式下程序所做的工作。這個程序有3個任務需要完成,每個任務都在等待I/O操作時阻塞自身。阻塞在I/O操作上所花費的時間已經用灰色框標示出來了。
在單線程同步模型中,任務按照順序執行。如果某個任務因為I/O而阻塞,其他所有的任務都必須等待,直到它完成之后它們才能依次執行。這種明確的執行順序和串行化處理的行為是很容易推斷得出的。如果任務之間並沒有互相依賴的關系,但仍然需要互相等待的話這就使得程序不必要的降低了運行速度。
在多線程版本中,這3個任務分別在獨立的線程中執行。這些線程由操作系統來管理,在多處理器系統上可以並行處理,或者在單處理器系統上交錯執行。這使得當某個線程阻塞在某個資源的同時其他線程得以繼續執行。與完成類似功能的同步程序相比,這種方式更有效率,但程序員必須寫代碼來保護共享資源,防止其被多個線程同時訪問。多線程程序更加難以推斷,因為這類程序不得不通過線程同步機制如鎖、可重入函數、線程局部存儲或者其他機制來處理線程安全問題,如果實現不當就會導致出現微妙且令人痛不欲生的bug。
在事件驅動版本的程序中,3個任務交錯執行,但仍然在一個單獨的線程控制中。當處理I/O或者其他昂貴的操作時,注冊一個回調到事件循環中,然后當I/O操作完成時繼續執行。回調描述了該如何處理某個事件。事件循環輪詢所有的事件,當事件到來時將它們分配給等待處理事件的回調函數。這種方式讓程序盡可能的得以執行而不需要用到額外的線程。事件驅動型程序比多線程程序更容易推斷出行為,因為程序員不需要關心線程安全問題。
當我們面對如下的環境時,事件驅動模型通常是一個好的選擇:
1. 程序中有許多任務,而且…
2. 任務之間高度獨立(因此它們不需要互相通信,或者等待彼此)而且…
3. 在等待事件到來時,某些任務會阻塞。
當應用程序需要在任務間共享可變的數據時,這也是一個不錯的選擇,因為這里不需要采用同步處理。
網絡應用程序通常都有上述這些特點,這使得它們能夠很好的契合事件驅動編程模型。
Twsited異步網絡框架
Twisted是一個事件驅動的網絡框架,其中包含了諸多功能,例如:網絡協議、線程、數據庫管理、網絡操作、電子郵件等
Protocols
Protocols描述了如何以異步的方式處理網絡中的事件。HTTP、DNS以及IMAP是應用層協議中的例子。Protocols實現了IProtocol接口,它包含如下的方法:
makeConnection 在transport對象和服務器之間建立一條連接
connectionMade 連接建立起來后調用
dataReceived 接收數據時調用
connectionLost 關閉連接時調用
Transports
Transports代表網絡中兩個通信結點之間的連接。Transports負責描述連接的細節,比如連接是面向流式的還是面向數據報的,流控以及可靠性。TCP、UDP和Unix套接字可作為transports的例子。它們被設計為“滿足最小功能單元,同時具有最大程度的可復用性”,而且從協議實現中分離出來,這讓許多協議可以采用相同類型的傳輸。Transports實現了ITransports接口,它包含如下的方法:
write 以非阻塞的方式按順序依次將數據寫到物理連接上
writeSequence 將一個字符串列表寫到物理連接上
loseConnection 將所有掛起的數據寫入,然后關閉連接
getPeer 取得連接中對端的地址信息
getHost 取得連接中本端的地址信息將transports從協議中分離出來也使得對這兩個層次的測試變得更加簡單。可以通過簡單地寫入一個字符串來模擬傳輸,用這種方式來檢查。
Twisted深入
http://krondo.com/an-introduction-to-asynchronous-programming-and-twisted/
http://blog.csdn.net/hanhuili/article/details/9389433
optparse --- 解析輸入參數的模塊
azureuser@MyUbuntuVM:~/pythonScripts$ cat optparseExample.py
#!/usr/bin/env python
import optparse
parser = optparse.OptionParser()
parser.add_option("-f", "--file", dest="filenames",help="file help info")
parser.add_option("-q", "--quiet", dest="quietnames",help="quiet help info")
(options,args) = parser.parse_args(['-f','aa','dd','-qbb','cc'])
print(options,args)
(options,args) = parser.parse_args()
print(options,args)
azureuser@MyUbuntuVM:~/pythonScripts$ python3 optparseExample.py
{'quietnames': 'bb', 'filenames': 'aa'} ['dd', 'cc']
{'quietnames': None, 'filenames': None} []
azureuser@MyUbuntuVM:~/pythonScripts$ python3 optparseExample.py -f ab -q abc
{'filenames': 'aa', 'quietnames': 'bb'} ['dd', 'cc']
{'filenames': 'ab', 'quietnames': 'abc'} []
azureuser@MyUbuntuVM:~/pythonScripts$ python3 optparseExample.py -f ab 11 -q abc 22
{'quietnames': 'bb', 'filenames': 'aa'} ['dd', 'cc']
{'quietnames': 'abc', 'filenames': 'ab'} ['11', '22']
azureuser@MyUbuntuVM:~/pythonScripts$
#############################################################################################################
http://www.cnblogs.com/wupeiqi/articles/5132791.html
redis
pip3 install redis
redis commands:
./redis-cli
keys * -- > check all keys
set name aa ex 5 --- > timeout = 5
save -- > save values to disk
redis是一個key-value存儲系統。和Memcached類似,它支持存儲的value類型相對更多,包括string(字符串)、list(鏈表)、set(集合)、zset(sorted set --有序集合)和hash(哈希類型)。這些數據類型都支持push/pop、add/remove及取交集並集和差集及更豐富的操作,而且這些操作都是原子性的。在此基礎上,redis支持各種不同方式的排序。與memcached一樣,為了保證效率,數據都是緩存在內存中。區別的是redis會周期性的把更新的數據寫入磁盤或者把修改操作寫入追加的記錄文件,並且在此基礎上實現了master-slave(主從)同步。
一、Redis安裝和基本使用
wget http://download.redis.io/releases/redis-3.0.6.tar.gz
tar xzf redis-3.0.6.tar.gz
cd redis-3.0.6
make
啟動服務端
1
src/redis-server
啟動客戶端
src/redis-cli
redis> set foo bar
OK
redis> get foo
"bar"
二、Python操作Redis
sudo pip install redis
or
sudo easy_install redis
or
源碼安裝
詳見:https://github.com/WoLpH/redis-py
API使用
redis-py 的API的使用可以分類為:
連接方式
連接池
操作
String 操作
Hash 操作
List 操作
Set 操作
Sort Set 操作
管道
發布訂閱
1、操作模式
redis-py提供兩個類Redis和StrictRedis用於實現Redis的命令,StrictRedis用於實現大部分官方的命令,並使用官方的語法和命令,Redis是StrictRedis的子類,用於向后兼容舊版本的redis-py。
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import redis
r = redis.Redis(host='10.211.55.4', port=6379)
r.set('foo', 'Bar')
print r.get('foo')
2、連接池
redis-py使用connection pool來管理對一個redis server的所有連接,避免每次建立、釋放連接的開銷。默認,每個Redis實例都會維護一個自己的連接池。可以直接建立一個連接池,然后作為參數Redis,這樣就可以實現多個Redis實例共享一個連接池。
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import redis
pool = redis.ConnectionPool(host='10.211.55.4', port=6379)
r = redis.Redis(connection_pool=pool)
r.set('foo', 'Bar')
print r.get('foo')
3、操作
String操作,redis中的String在在內存中按照一個name對應一個value來存儲。如圖:
set(name, value, ex=None, px=None, nx=False, xx=False)
在Redis中設置值,默認,不存在則創建,存在則修改
參數:
ex,過期時間(秒)
px,過期時間(毫秒)
nx,如果設置為True,則只有name不存在時,當前set操作才執行
xx,如果設置為True,則只有name存在時,崗前set操作才執行
setnx(name, value)
設置值,只有name不存在時,執行設置操作(添加)
setex(name, value, time)
# 設置值
# 參數:
# time,過期時間(數字秒 或 timedelta對象)
psetex(name, time_ms, value)
# 設置值
# 參數:
# time_ms,過期時間(數字毫秒 或 timedelta對象)
mset(*args, **kwargs)
批量設置值
如:
mset(k1='v1', k2='v2')
或
mget({'k1': 'v1', 'k2': 'v2'})
get(name)
獲取值
mget(keys, *args)
批量獲取
如:
mget('ylr', 'wupeiqi')
或
r.mget(['ylr', 'wupeiqi'])
getset(name, value)
設置新值並獲取原來的值
getrange(key, start, end)
# 獲取子序列(根據字節獲取,非字符)
# 參數:
# name,Redis 的 name
# start,起始位置(字節)
# end,結束位置(字節)
# 如: "武沛齊" ,0-3表示 "武"
setrange(name, offset, value)
# 修改字符串內容,從指定字符串索引開始向后替換(新值太長時,則向后添加)
# 參數:
# offset,字符串的索引,字節(一個漢字三個字節)
# value,要設置的值
setbit(name, offset, value)
# 對name對應值的二進制表示的位進行操作
# 參數:
# name,redis的name
# offset,位的索引(將值變換成二進制后再進行索引)
# value,值只能是 1 或 0
# 注:如果在Redis中有一個對應: n1 = "foo",
那么字符串foo的二進制表示為:01100110 01101111 01101111
所以,如果執行 setbit('n1', 7, 1),則就會將第7位設置為1,
那么最終二進制則變成 01100111 01101111 01101111,即:"goo"
# 擴展,轉換二進制表示:
# source = "武沛齊"
source = "foo"
for i in source:
num = ord(i)
print bin(num).replace('b','')
特別的,如果source是漢字 "武沛齊"怎么辦?
答:對於utf-8,每一個漢字占 3 個字節,那么 "武沛齊" 則有 9個字節
對於漢字,for循環時候會按照 字節 迭代,那么在迭代時,將每一個字節轉換 十進制數,然后再將十進制數轉換成二進制
11100110 10101101 10100110 11100110 10110010 10011011 11101001 10111101 10010000
-------------------------- ----------------------------- -----------------------------
武 沛 齊
getbit(name, offset)
# 獲取name對應的值的二進制表示中的某位的值 (0或1)
bitcount(key, start=None, end=None)
# 獲取name對應的值的二進制表示中 1 的個數
# 參數:
# key,Redis的name
# start,位起始位置
# end,位結束位置
bitop(operation, dest, *keys)
# 獲取多個值,並將值做位運算,將最后的結果保存至新的name對應的值
# 參數:
# operation,AND(並) 、 OR(或) 、 NOT(非) 、 XOR(異或)
# dest, 新的Redis的name
# *keys,要查找的Redis的name
# 如:
bitop("AND", 'new_name', 'n1', 'n2', 'n3')
# 獲取Redis中n1,n2,n3對應的值,然后講所有的值做位運算(求並集),然后將結果保存 new_name 對應的值中
strlen(name)
# 返回name對應值的字節長度(一個漢字3個字節)
incr(self, name, amount=1)
# 自增 name對應的值,當name不存在時,則創建name=amount,否則,則自增。
# 參數:
# name,Redis的name
# amount,自增數(必須是整數)
# 注:同incrby
incrbyfloat(self, name, amount=1.0)
# 自增 name對應的值,當name不存在時,則創建name=amount,否則,則自增。
# 參數:
# name,Redis的name
# amount,自增數(浮點型)
decr(self, name, amount=1)
# 自減 name對應的值,當name不存在時,則創建name=amount,否則,則自減。
# 參數:
# name,Redis的name
# amount,自減數(整數)
append(key, value)
# 在redis name對應的值后面追加內容
# 參數:
key, redis的name
value, 要追加的字符串
Hash操作,redis中Hash在內存中的存儲格式如下圖:
hset(name, key, value)
# name對應的hash中設置一個鍵值對(不存在,則創建;否則,修改)
# 參數:
# name,redis的name
# key,name對應的hash中的key
# value,name對應的hash中的value
# 注:
# hsetnx(name, key, value),當name對應的hash中不存在當前key時則創建(相當於添加)
hmset(name, mapping)
# 在name對應的hash中批量設置鍵值對
# 參數:
# name,redis的name
# mapping,字典,如:{'k1':'v1', 'k2': 'v2'}
# 如:
# r.hmset('xx', {'k1':'v1', 'k2': 'v2'})
hget(name,key)
# 在name對應的hash中獲取根據key獲取value
hmget(name, keys, *args)
# 在name對應的hash中獲取多個key的值
# 參數:
# name,reids對應的name
# keys,要獲取key集合,如:['k1', 'k2', 'k3']
# *args,要獲取的key,如:k1,k2,k3
# 如:
# r.mget('xx', ['k1', 'k2'])
# 或
# print r.hmget('xx', 'k1', 'k2')
hgetall(name)
獲取name對應hash的所有鍵值
hlen(name)
# 獲取name對應的hash中鍵值對的個數
hkeys(name)
# 獲取name對應的hash中所有的key的值
hvals(name)
# 獲取name對應的hash中所有的value的值
hexists(name, key)
# 檢查name對應的hash是否存在當前傳入的key
hdel(name,*keys)
# 將name對應的hash中指定key的鍵值對刪除
hincrby(name, key, amount=1)
# 自增name對應的hash中的指定key的值,不存在則創建key=amount
# 參數:
# name,redis中的name
# key, hash對應的key
# amount,自增數(整數)
hincrbyfloat(name, key, amount=1.0)
# 自增name對應的hash中的指定key的值,不存在則創建key=amount
# 參數:
# name,redis中的name
# key, hash對應的key
# amount,自增數(浮點數)
# 自增name對應的hash中的指定key的值,不存在則創建key=amount
hscan(name, cursor=0, match=None, count=None)
# 增量式迭代獲取,對於數據大的數據非常有用,hscan可以實現分片的獲取數據,並非一次性將數據全部獲取完,從而放置內存被撐爆
# 參數:
# name,redis的name
# cursor,游標(基於游標分批取獲取數據)
# match,匹配指定key,默認None 表示所有的key
# count,每次分片最少獲取個數,默認None表示采用Redis的默認分片個數
# 如:
# 第一次:cursor1, data1 = r.hscan('xx', cursor=0, match=None, count=None)
# 第二次:cursor2, data1 = r.hscan('xx', cursor=cursor1, match=None, count=None)
# ...
# 直到返回值cursor的值為0時,表示數據已經通過分片獲取完畢
hscan_iter(name, match=None, count=None)
# 利用yield封裝hscan創建生成器,實現分批去redis中獲取數據
# 參數:
# match,匹配指定key,默認None 表示所有的key
# count,每次分片最少獲取個數,默認None表示采用Redis的默認分片個數
# 如:
# for item in r.hscan_iter('xx'):
# print item
List操作,redis中的List在在內存中按照一個name對應一個List來存儲。如圖:
lpush(name,values)
# 在name對應的list中添加元素,每個新的元素都添加到列表的最左邊
# 如:
# r.lpush('oo', 11,22,33)
# 保存順序為: 33,22,11
# 擴展:
# rpush(name, values) 表示從右向左操作
lpushx(name,value)
# 在name對應的list中添加元素,只有name已經存在時,值添加到列表的最左邊
# 更多:
# rpushx(name, value) 表示從右向左操作
llen(name)
# name對應的list元素的個數
linsert(name, where, refvalue, value))
# 在name對應的列表的某一個值前或后插入一個新值
# 參數:
# name,redis的name
# where,BEFORE或AFTER
# refvalue,標桿值,即:在它前后插入數據
# value,要插入的數據
r.lset(name, index, value)
# 對name對應的list中的某一個索引位置重新賦值
# 參數:
# name,redis的name
# index,list的索引位置
# value,要設置的值
r.lrem(name, value, num)
# 在name對應的list中刪除指定的值
# 參數:
# name,redis的name
# value,要刪除的值
# num, num=0,刪除列表中所有的指定值;
# num=2,從前到后,刪除2個;
# num=-2,從后向前,刪除2個
lpop(name)
# 在name對應的列表的左側獲取第一個元素並在列表中移除,返回值則是第一個元素
# 更多:
# rpop(name) 表示從右向左操作
lindex(name, index)
在name對應的列表中根據索引獲取列表元素
lrange(name, start, end)
# 在name對應的列表分片獲取數據
# 參數:
# name,redis的name
# start,索引的起始位置
# end,索引結束位置
ltrim(name, start, end)
# 在name對應的列表中移除沒有在start-end索引之間的值
# 參數:
# name,redis的name
# start,索引的起始位置
# end,索引結束位置
rpoplpush(src, dst)
# 從一個列表取出最右邊的元素,同時將其添加至另一個列表的最左邊
# 參數:
# src,要取數據的列表的name
# dst,要添加數據的列表的name
blpop(keys, timeout)
# 將多個列表排列,按照從左到右去pop對應列表的元素
# 參數:
# keys,redis的name的集合
# timeout,超時時間,當元素所有列表的元素獲取完之后,阻塞等待列表內有數據的時間(秒), 0 表示永遠阻塞
# 更多:
# r.brpop(keys, timeout),從右向左獲取數據
brpoplpush(src, dst, timeout=0)
# 從一個列表的右側移除一個元素並將其添加到另一個列表的左側
# 參數:
# src,取出並要移除元素的列表對應的name
# dst,要插入元素的列表對應的name
# timeout,當src對應的列表中沒有數據時,阻塞等待其有數據的超時時間(秒),0 表示永遠阻塞
自定義增量迭代
# 由於redis類庫中沒有提供對列表元素的增量迭代,如果想要循環name對應的列表的所有元素,那么就需要:
# 1、獲取name對應的所有列表
# 2、循環列表
# 但是,如果列表非常大,那么就有可能在第一步時就將程序的內容撐爆,所有有必要自定義一個增量迭代的功能:
def list_iter(name):
"""
自定義redis列表增量迭代
:param name: redis中的name,即:迭代name對應的列表
:return: yield 返回 列表元素
"""
list_count = r.llen(name)
for index in xrange(list_count):
yield r.lindex(name, index)
# 使用
for item in list_iter('pp'):
print item
Set操作,Set集合就是不允許重復的列表
sadd(name,values)
# name對應的集合中添加元素
scard(name)
獲取name對應的集合中元素個數
sdiff(keys, *args)
在第一個name對應的集合中且不在其他name對應的集合的元素集合
sdiffstore(dest, keys, *args)
# 獲取第一個name對應的集合中且不在其他name對應的集合,再將其新加入到dest對應的集合中
sinter(keys, *args)
# 獲取多一個name對應集合的並集
sinterstore(dest, keys, *args)
# 獲取多一個name對應集合的並集,再講其加入到dest對應的集合中
sismember(name, value)
# 檢查value是否是name對應的集合的成員
smembers(name)
# 獲取name對應的集合的所有成員
smove(src, dst, value)
# 將某個成員從一個集合中移動到另外一個集合
spop(name)
# 從集合的右側(尾部)移除一個成員,並將其返回
srandmember(name, numbers)
# 從name對應的集合中隨機獲取 numbers 個元素
srem(name, values)
# 在name對應的集合中刪除某些值
sunion(keys, *args)
# 獲取多一個name對應的集合的並集
sunionstore(dest,keys, *args)
# 獲取多一個name對應的集合的並集,並將結果保存到dest對應的集合中
sscan(name, cursor=0, match=None, count=None)
sscan_iter(name, match=None, count=None)
# 同字符串的操作,用於增量迭代分批獲取元素,避免內存消耗太大
有序集合,在集合的基礎上,為每元素排序;元素的排序需要根據另外一個值來進行比較,所以,對於有序集合,每一個元素有兩個值,即:值和分數,分數專門用來做排序。
zadd(name, *args, **kwargs)
# 在name對應的有序集合中添加元素
# 如:
# zadd('zz', 'n1', 1, 'n2', 2)
# 或
# zadd('zz', n1=11, n2=22)
zcard(name)
# 獲取name對應的有序集合元素的數量
zcount(name, min, max)
# 獲取name對應的有序集合中分數 在 [min,max] 之間的個數
zincrby(name, value, amount)
# 自增name對應的有序集合的 name 對應的分數
r.zrange( name, start, end, desc=False, withscores=False, score_cast_func=float)
# 按照索引范圍獲取name對應的有序集合的元素
# 參數:
# name,redis的name
# start,有序集合索引起始位置(非分數)
# end,有序集合索引結束位置(非分數)
# desc,排序規則,默認按照分數從小到大排序
# withscores,是否獲取元素的分數,默認只獲取元素的值
# score_cast_func,對分數進行數據轉換的函數
# 更多:
# 從大到小排序
# zrevrange(name, start, end, withscores=False, score_cast_func=float)
# 按照分數范圍獲取name對應的有序集合的元素
# zrangebyscore(name, min, max, start=None, num=None, withscores=False, score_cast_func=float)
# 從大到小排序
# zrevrangebyscore(name, max, min, start=None, num=None, withscores=False, score_cast_func=float)
zrank(name, value)
# 獲取某個值在 name對應的有序集合中的排行(從 0 開始)
# 更多:
# zrevrank(name, value),從大到小排序
zrangebylex(name, min, max, start=None, num=None)
# 當有序集合的所有成員都具有相同的分值時,有序集合的元素會根據成員的 值 (lexicographical ordering)來進行排序,而這個命令則可以返回給定的有序集合鍵 key 中, 元素的值介於 min 和 max 之間的成員
# 對集合中的每個成員進行逐個字節的對比(byte-by-byte compare), 並按照從低到高的順序, 返回排序后的集合成員。 如果兩個字符串有一部分內容是相同的話, 那么命令會認為較長的字符串比較短的字符串要大
# 參數:
# name,redis的name
# min,左區間(值)。 + 表示正無限; - 表示負無限; ( 表示開區間; [ 則表示閉區間
# min,右區間(值)
# start,對結果進行分片處理,索引位置
# num,對結果進行分片處理,索引后面的num個元素
# 如:
# ZADD myzset 0 aa 0 ba 0 ca 0 da 0 ea 0 fa 0 ga
# r.zrangebylex('myzset', "-", "[ca") 結果為:['aa', 'ba', 'ca']
# 更多:
# 從大到小排序
# zrevrangebylex(name, max, min, start=None, num=None)
zrem(name, values)
# 刪除name對應的有序集合中值是values的成員
# 如:zrem('zz', ['s1', 's2'])
zremrangebyrank(name, min, max)
# 根據排行范圍刪除
zremrangebyscore(name, min, max)
# 根據分數范圍刪除
zremrangebylex(name, min, max)
# 根據值返回刪除
zscore(name, value)
# 獲取name對應有序集合中 value 對應的分數
zinterstore(dest, keys, aggregate=None)
# 獲取兩個有序集合的交集,如果遇到相同值不同分數,則按照aggregate進行操作
# aggregate的值為: SUM MIN MAX
zunionstore(dest, keys, aggregate=None)
# 獲取兩個有序集合的並集,如果遇到相同值不同分數,則按照aggregate進行操作
# aggregate的值為: SUM MIN MAX
zscan(name, cursor=0, match=None, count=None, score_cast_func=float)
zscan_iter(name, match=None, count=None,score_cast_func=float)
# 同字符串相似,相較於字符串新增score_cast_func,用來對分數進行操作
其他常用操作
delete(*names)
# 根據刪除redis中的任意數據類型
exists(name)
# 檢測redis的name是否存在
keys(pattern='*')
# 根據模型獲取redis的name
# 更多:
# KEYS * 匹配數據庫中所有 key 。
# KEYS h?llo 匹配 hello , hallo 和 hxllo 等。
# KEYS h*llo 匹配 hllo 和 heeeeello 等。
# KEYS h[ae]llo 匹配 hello 和 hallo ,但不匹配 hillo
expire(name ,time)
# 為某個redis的某個name設置超時時間
rename(src, dst)
# 對redis的name重命名為
move(name, db))
# 將redis的某個值移動到指定的db下
randomkey()
# 隨機獲取一個redis的name(不刪除)
type(name)
# 獲取name對應值的類型
scan(cursor=0, match=None, count=None)
scan_iter(match=None, count=None)
# 同字符串操作,用於增量迭代獲取key
4、管道
redis-py默認在執行每次請求都會創建(連接池申請連接)和斷開(歸還連接池)一次連接操作,如果想要在一次請求中指定多個命令,則可以使用pipline實現一次請求指定多個命令,並且默認情況下一次pipline 是原子性操作。
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import redis
pool = redis.ConnectionPool(host='10.211.55.4', port=6379)
r = redis.Redis(connection_pool=pool)
# pipe = r.pipeline(transaction=False)
pipe = r.pipeline(transaction=True)
r.set('name', 'alex')
r.set('role', 'sb')
pipe.execute()
5. 發布訂閱
cat redis_helper.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import redis
class RedisHelper:
def __init__(self):
self.__conn = redis.Redis(host='10.211.55.4')
self.chan_sub = 'fm104.5'
self.chan_pub = 'fm104.5'
def public(self, msg):
self.__conn.publish(self.chan_pub, msg)
return True
def subscriber(self):
pub = self.__conn.pubsub()
pub.subscribe(self.chan_sub)
pub.parse_response()
return pub
訂閱者:
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from redis_helper import RedisHelper
obj = RedisHelper()
redis_sub = obj.subscriber()
while True:
msg= redis_sub.parse_response()
print msg
發布者:
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from redis_helper import RedisHelper
obj = RedisHelper()
obj.public('hello')
更多參見:https://github.com/andymccurdy/redis-py/
http://doc.redisfans.com/
##################################################################################
RabbitMQ
RabbitMQ是一個在AMQP基礎上完整的,可復用的企業消息系統。他遵循Mozilla Public License開源協議。
MQ全稱為Message Queue, 消息隊列(MQ)是一種應用程序對應用程序的通信方法。應用程序通過讀寫出入隊列的消息(針對應用程序的數據)來通信,而無需專用連接來鏈接它們。消 息傳遞指的是程序之間通過在消息中發送數據進行通信,而不是通過直接調用彼此來通信,直接調用通常是用於諸如遠程過程調用的技術。排隊指的是應用程序通過 隊列來通信。隊列的使用除去了接收和發送應用程序同時執行的要求。
install
http://www.rabbitmq.com/install-debian.html
wget http://www.rabbitmq.com//releases/rabbitmq-server/v3.6.1/rabbitmq-server_3.6.1-1_all.deb
apt-get install erlang
dpkg -i rabbitmq-server_3.6.1-1_all.deb
pip3 install pika
生產者:
user@MyUbuntuVM:~/pythonScripts/rabbitmq$ cat produce.py
#!/usr/bin/env python
import pika,time
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='',routing_key='hello',body='at %s Hello World!'%time.strftime("%Y-%m-%d %H:%M:%S"))
print("Sent 'hello world!'")
connection.close()
消費者:
user@MyUbuntuVM:~/pythonScripts/rabbitmq$ cat consume.py
#!/usr/bin/env python
import pika,time
connection=pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
'''once we restart rabbitmq server, the queue will disappear, so if we execute consume.py before produce.py, will return error: no queue hello -- > if we comment the upper declare sentence,because queue hello has gone, produce.py does not execute, below basic_consume already uses queue hello, then you will see the error.'''
def callback(ch,method,properties,body):
print("at %s received %r" %(time.strftime("%Y-%m-%d %H:%M:%S"),body))
#print("ch: %s, method: %s, properties: %s" %(ch,method,properties))
'''ch: <pika.adapters.blocking_connection.BlockingChannel object at 0x7f0a53268208>,
method: <Basic.Deliver(['consumer_tag=ctag1.08f318df4d11438b98e7ab9a20d5721d', 'deliv ery_tag=1', 'exchange=', 'redelivered=False', 'routing_key=hello'])>,
properties: <BasicProperties>'''
channel.basic_consume(callback,queue='hello',no_ack=True)
print("waiting for message. press ctrl+c to exit.")
channel.start_consuming()
消費者no_ack=False:
user@MyUbuntuVM:~/pythonScripts/rabbitmq$ cat consumeNo_ackFalse.py
#!/usr/bin/env python
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch,method,properties,body):
print("at %s received %s" %(time.strftime("%Y-%m-%d %H:%M:%S"),body))
print('method,',method)
print('properties,',properties)
#time.sleep(10)
#print("at %s ok" %time.strftime("%Y-%m-%d %H:%M:%S"))
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_consume(callback,queue='hello',no_ack=False)
print("waiting for messages. press ctrl+c to exit.")
channel.start_consuming()
消費者prefetch_count=1:
user@MyUbuntuVM:~/pythonScripts/rabbitmq$ cat consumePrefetch.py
#!/usr/bin/env python
import pika,time
connection=pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch,method,properties,body):
print("at %s received %r" %(time.strftime("%Y-%m-%d %H:%M:%S"),body))
#print("ch: %s, method: %s, properties: %s" %(ch,method,properties))
'''ch: <pika.adapters.blocking_connection.BlockingChannel object at 0x7f0a53268208>,
method: <Basic.Deliver(['consumer_tag=ctag1.08f318df4d11438b98e7ab9a20d5721d', 'deliv ery_tag=1', 'exchange=', 'redelivered=False', 'routing_key=hello'])>,
properties: <BasicProperties>'''
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,queue='hello',no_ack=True)
print("waiting for message. press ctrl+c to exit.")
channel.start_consuming()
生產者 持久型queue:
user@MyUbuntuVM:~/pythonScripts/rabbitmq$ cat produceDurable.py
#!/usr/bin/env python
'''remember after executing this script, restart rabbitmq server, then execute consumeDurable.py see if can still receive the message'''
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello1',durable=True)
channel.basic_publish(exchange='',routing_key='hello1',body='at %s Hello World!'%time.strftime("%Y-%m-%d %H:%M:%S"),properties=pika.BasicProperties(delivery_mode=2,))
print("sent 'Hello World!'")
connection.close()
消費者:
user@MyUbuntuVM:~/pythonScripts/rabbitmq$ cat consumeDurable.py
#!/usr/bin/env python
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel= connection.channel()
#channel.queue_declare(queue='hello1',durable=True)
'''here we don't even to declare the queue name if we declared once.after restart rabbitmq server, we don't need to declare here either.'''
def callback(ch,method,properties,body):
print("at %s received %s"%(time.strftime("%Y-%m-%d %H:%M:%S"),body))
print('properties,',properties)
print('method,',method)
channel.basic_consume(callback,queue='hello1',no_ack=True)
print("waiting for messages. press ctrl+c to exit.")
channel.start_consuming()
發布者:
user@MyUbuntuVM:~/pythonScripts/rabbitmq$ cat publish.py
#!/usr/bin/env python
'''pub and sub is not like simple queue. simple queue is like you come,you get this message,next time another guy comes, he gets another message. pub and sub is like everyone can receive the same messages. because it is going to send message to all queues. and sub should execute before pub, otherwise sub will miss the message if pub sent already. subscribe.py will generate random queue names.'''
import pika
import sys
import time
connection=pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel=connection.channel()
channel.exchange_declare(exchange='logs',type='fanout')
message=' '.join(sys.argv[1:]) or 'at %s Info:hello world.'%time.strftime("%Y-%m-%d %H:%M:%S")
channel.basic_publish(exchange='logs',routing_key='',body=message)
print("sent %r " %message)
connection.close()
訂閱者:
user@MyUbuntuVM:~/pythonScripts/rabbitmq$ cat subscribe.py
#!/usr/bin/env python
import pika
connection=pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel=connection.channel()
channel.exchange_declare(exchange='logs',type='fanout')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='logs',queue=queue_name)
print('waiting for messages. press ctrl+c to exit.')
def callback(ch,method,properties,body):
print(" received %r" % body)
print("queue name: ", queue_name)
channel.basic_consume(callback,queue=queue_name,no_ack=True)
channel.start_consuming()
發布者exchange type=direct:
user@MyUbuntuVM:~/pythonScripts/rabbitmq$ cat publishDirect.py
#!/usr/bin/env python
import pika
import sys
connection=pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel=connection.channel()
channel.exchange_declare(exchange='direct_logs',type='direct')
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'hello world'
channel.basic_publish(exchange='direct_logs',routing_key=severity,body=message)
print("sent %r %r" %(severity, message))
connection.close()
訂閱者:
user@MyUbuntuVM:~/pythonScripts/rabbitmq$ cat subscribeDirect.py
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel=connection.channel()
channel.exchange_declare(exchange='direct_logs',type='direct')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
severities = sys.argv[1:]
if not severities:
sys.stderr.write("usage: %s info warning error \n" % sys.argv[0])
sys.exit()
for severity in severities:
channel.queue_bind(exchange='direct_logs',queue=queue_name,routing_key=severity)
''' this routing_key should be the same with publishDirect.py's routing_key,otherwise won't receive the message.'''
print("waiting for messages. press ctrl+c to exit.")
def callback(ch,method,properties,body):
print("%s %s %s" %( method.routing_key,body,queue_name))
channel.basic_consume(callback,queue=queue_name,no_ack=True)
channel.start_consuming()
發布者exchange type=topic:
user@MyUbuntuVM:~/pythonScripts/rabbitmq$ cat publishTopic.py
#!/usr/bin/env python
import pika
import sys
connection=pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs',type='topic')
routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anno.info'
message = ' '.join(sys.argv[2:]) or 'Hello world'
channel.basic_publish(exchange='topic_logs',routing_key=routing_key,body=message)
'''exchange type is 'topic', obscure match. if the sender routing key is ab, the receiver routing key sets to ab.# can receive, but not ab.*'''
print("sent %r %r " % (routing_key,message))
connection.close()
訂閱者:
user@MyUbuntuVM:~/pythonScripts/rabbitmq$ cat subscribeTopic.py
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs',type='topic')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
binding_keys = sys.argv[1:]
if not binding_keys:
sys.stderr.write("usage: %s binding_key ...\n" %sys.argv[0])
sys.exit()
for binding_key in binding_keys:
channel.queue_bind(exchange='topic_logs',queue=queue_name,routing_key=binding_key)
'''exchange type is 'topic', obscure match. if the sender routing key is ab, the receiver r
outing key sets to ab.# can receive, but not ab.*'''
print("waiting for message. press ctrl+c to exit.")
def callback(ch,method,properties,body):
print("%s %s " %(method.routing_key,body))
channel.basic_consume(callback,queue=queue_name,no_ack=True)
channel.start_consuming()