RabbitMQ隊列/Redis緩存


一、RabbitMQ隊列

RabbitMQ安裝(Centos7安裝):
1、安裝依賴:
yum install socat (不安裝會報錯)
2、下載rpm包:
wget http://www.rabbitmq.com/releases/erlang/erlang-19.0.4-1.el7.centos.x86_64.rpm wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.6/rabbitmq-server-3.6.6-1.el7.noarch.rpm
3、安裝軟件包:
rpm -ivh erlang-19.0.4-1.el7.centos.x86_64.rpm
rpm -ivh rabbitmq-server-3.6.6-1.el7.noarch.rpm 
4、啟動RabbitMQ:
systemctl start rabbit-server
systemctl status rabbit-server
systemctl enable rabbit-server

1、Python的RabbitMQ操作模塊pika:

send端:

import pika
import time

credentials = pika.PlainCredentials('cheng','cheng')   
connect = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.71.3',port=5672,credentials=credentials))
channel = connect.channel()
#聲明queue channel.queue_declare(queue
='hello') for i in range(10): channel.basic_publish(exchange='', routing_key='hello', body="hello people %s"%i) time.sleep(2) print("is ok send !") connect.close()

receive端:

import pika
credentials = pika.PlainCredentials('cheng','cheng')
connect = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.71.3',port=5672,credentials=credentials))
channel = connect.channel()
channel.queue_declare(queue
='hello') def callback(ch,method,properties,body): body = body.decode() print("is ok ,%s"%body) channel.basic_consume(callback, queue='hello', no_ack=True) print('is wait') channel.start_consuming()

2、隊列持久化:

上述代碼在服務端宕了之后,消息會丟失,以下是讓隊列持久化的代碼:

send端:
import pika,time credentials = pika.PlainCredentials('cheng','cheng') connect = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.71.3',port=5672,credentials=credentials)) channel = connect.channel() channel.queue_declare(queue='pass',durable=True) #增加:durable for i in range(10): channel.basic_publish(exchange = '', body = 'hello %s!!!!'%i, routing_key = 'pass', properties = pika.BasicProperties( delivery_mode=2 )) #增加properties time.sleep(2) print("is send ok") connect.close()

 

receive端:

import pika
credentials = pika.PlainCredentials('cheng','cheng')
connect = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.71.3',port=5672,credentials=credentials))
channel = connect.channel()
channel.queue_declare(queue='pass',durable=True)
def callback(ch,method,properties,body):
    ch.basic_ack(delivery_tag=method.delivery_tag)     #給發送端發出收到確認,沒有這個的話receive端沒處理完,就把消息刪除了
    print(body.decode())

channel.basic_consume(callback,
                      queue='pass')                     #去掉no_ack
print("is wait")
channel.start_consuming()

3、消息公平分發:

如果Rabbit只管按順序把消息發到各個消費者身上,不考慮消費者負載的話,很可能出現,一個機器配置不高的消費者那里堆積了很多消息處理不完,同時配置高的消費者卻一直很輕松。為解決此問題,可以在各個消費者端,配置perfetch=1,意思就是告訴RabbitMQ在我這個消費者當前消息還沒處理完的時候就不要再給我發新消息了。

import pika
credentials = pika.PlainCredentials('cheng','cheng')
connect = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.71.3',port=5672,credentials=credentials))
channel = connect.channel()
channel.queue_declare(queue='pass',durable=True)
def callback(ch,method,properties,body):
    ch.basic_ack(delivery_tag=method.delivery_tag)
    print(body.decode())

channel.basic_qos(prefetch_count=1)    #增加perfetch_count=1
channel.basic_consume(callback,
                      queue='pass')
print("is wait")
channel.start_consuming()

4、Publish\Subscribe(消息發布\訂閱) 

之前的例子都基本都是1對1的消息發送和接收,即消息只能發送到指定的queue里,但有些時候你想讓你的消息被所有的Queue收到,類似廣播的效果,這時候就要用到exchange了。

Exchange在定義的時候是有類型的,以決定到底是哪些Queue符合條件,可以接收消息:

fanout:  所有bind到此exchange的queue都可以接收消息
direct:   通過routingKey和exchange決定的那個唯一的queue可以接收消息
topic:    所有符合routingKey(此時可以是一個表達式)的routingKey所bind的queue可以接收消息

headers: 通過headers 來決定把消息發給哪些queue

綁定此exchange的queue可接受消息:

Send端:

import pika ,time
credentials = pika.PlainCredentials('cheng','cheng')
connect = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.71.3',port=5672,credentials=credentials))
channel = connect.channel()
channel.exchange_declare(exchange='log',
                         type='fanout')
for i in range(10):
    channel.basic_publish(exchange='log',
                          routing_key='',
                          body = 'hello %s'%i)
    time.sleep(2)
print("send is ok")
connect.close()

Receice端:

import pika
credentials = pika.PlainCredentials('cheng','cheng')
connect = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.71.3',port=5672,credentials=credentials))
channel = connect.channel()
channel.exchange_declare(exchange='log',
                         type='fanout')
result = channel.queue_declare(exclusive=True,queue='')    #不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開后,自動將queue刪除
queue_name = result.method.queue
channel.queue_bind(exchange='log',
                   queue=queue_name)
print("is wait !")
def callback(ch,method,properties,body):
    print(body.decode())
channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)
channel.start_consuming()

 Direct有選擇接收消息:

Send端:

import pika
credentials = pika.PlainCredentials('cheng','cheng')
connect = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.71.3',port=5672,credentials=credentials))
channel = connect.channel()
channel.exchange_declare(exchange='logs',
                         type='direct')

#key='na'
key= 'name'    #接收端會根據不同的key來接收

channel.basic_publish(exchange='logs',
                        routing_key=key,
                        body = 'hello ')

print("send is ok")
connect.close()

Receive端:

import pika
credentials = pika.PlainCredentials('cheng','cheng')
connect = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.71.3',port=5672,credentials=credentials))
channel = connect.channel()
channel.exchange_declare(exchange='logs',
                         type='direct')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

key = ['name','age','height']  #key是這個列表的內容消息都會接收
for i in key:
    channel.queue_bind(exchange='logs',
                       queue=queue_name,
                       routing_key=i)
print("is wait !")
def callback(ch,method,properties,body):
    print(body.decode())
channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)
channel.start_consuming()

Topic更細致得選擇接收消息:

 表達式符號說明:#代表一個或多個字符,*代表任何字符
     例:#.a會匹配a.a,aa.a,aaa.a等
           *.a會匹配a.a,b.a,c.a等
     注:使用RoutingKey為#,Exchange Type為topic的時候相當於使用fanout

Send端:

import pika
credentials = pika.PlainCredentials('cheng','cheng')
connect = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.71.3',port=5672,credentials=credentials))
channel = connect.channel()
channel.exchange_declare(exchange='log1',
                         type='topic')


key= 'age.sdasfdf'

channel.basic_publish(exchange='log1',
                        routing_key=key,
                        body = 'hello ')

print("send is ok")
connect.close()

Receive端:

import pika
credentials = pika.PlainCredentials('cheng','cheng')
connect = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.71.3',port=5672,credentials=credentials))
channel = connect.channel()
channel.exchange_declare(exchange='log1',
                         type='topic')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

key = ['*.name','age.*','#.height']   #*.name以.name結尾的key,age.*以age.開頭的key,但是*name就不可以,去掉.就不行了,不清楚怎么回事。
for i in key:
    channel.queue_bind(exchange='log1',
                       queue=queue_name,
                       routing_key=i)
print("is wait !")
def callback(ch,method,properties,body):
    print(body.decode())
channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)
channel.start_consuming()

RPC:

RPC client

import pika
import uuid
 
class FibonacciRpcClient(object):
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(
                host='localhost'))
 
        self.channel = self.connection.channel()
 
        result = self.channel.queue_declare(exclusive=True)
        self.callback_queue = result.method.queue
 
        self.channel.basic_consume(self.on_response, no_ack=True,
                                   queue=self.callback_queue)
 
    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body
 
    def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(exchange='',
                                   routing_key='rpc_queue',
                                   properties=pika.BasicProperties(
                                         reply_to = self.callback_queue,
                                         correlation_id = self.corr_id,
                                         ),
                                   body=str(n))
        while self.response is None:
            self.connection.process_data_events()
        return int(self.response)
 
fibonacci_rpc = FibonacciRpcClient()
 
print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(30)
print(" [.] Got %r" % response)

RPC server

import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
 
channel = connection.channel()
 
channel.queue_declare(queue='rpc_queue')
 
def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n-1) + fib(n-2)
 
def on_request(ch, method, props, body):
    n = int(body)
 
    print(" [.] fib(%s)" % n)
    response = fib(n)
 
    ch.basic_publish(exchange='',
                     routing_key=props.reply_to,
                     properties=pika.BasicProperties(correlation_id = \
                                                         props.correlation_id),
                     body=str(response))
    ch.basic_ack(delivery_tag = method.delivery_tag)
 
channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request, queue='rpc_queue')
 
print(" [x] Awaiting RPC requests")
channel.start_consuming()

 二、Redis緩存

Centos安裝Redis:

1、下載安裝包:
/usr/local/src
wget http://download.redis.io/releases/redis-4.0.1.tar.gz
tar zxvf redis-4.0.1.tar.gz
2、安裝:
cd redis-4.0.1
make 
cd src/
make install PREFIX=/usr/local/redis
3、復制配置文件:
mkdir /usr/local/redis/etc/
cp redis.conf /usr/local/redis/etc/
mv redis.conf /usr/local/redis/etc/
4、修改配置文件:
cd /usr/local/redis/etc/
vi redis.conf 
將daemonize的值改為yes
5、啟動redis:
cd /usr/local/redis/bin/
 ./redis-server /usr/local/redis/etc/redis.conf 
ps -ef |grep redis
./redis-cli 
注:根據上面的配置還不能實現遠程的連接,需要配置requirepass字段,后面加密碼,還有注釋點bind字段,具體的配置見:

1、Python操作Redis

 連接redis:

import redis
r= redis.Redis(host='192.168.71.3',port=6379,db=4,password='zedata')
r.set('name','cheng')
print(r.get('name'))

用連接池來連接,避免每次建立、釋放連接的開銷。

import redis
pool = redis.ConnectionPool(host='192.168.71.3',port=6379,db=4,password='zedata')
conn =redis.Redis(connection_pool=pool)
conn.set('name','cheng')
print(conn.get('name'))

1)、String操作:

redis中的String在在內存中按照一個name對應一個value來存儲:

set(name, value, ex=None, px=None, nx=False, xx=False)

在Redis中設置值,默認,不存在則創建,存在則修改
參數:
     ex,過期時間(秒)
     px,過期時間(毫秒)
     nx,如果設置為True,則只有name不存在時,當前set操作才執行
     xx,如果設置為True,則只有name存在時,當前set操作才執行

setnx(name, value)

設置值,只有name不存在時,執行設置操作(添加)

setex(name, value, time)

設置值
參數:
    time,過期時間(數字秒 或 timedelta對象) 

psetex(name, time_ms, value)

設置值
參數:
    time_ms,過期時間(數字毫秒 或 timedelta對象

mset(*args, **kwargs)

批量設置值
如:
    mset(k1='v1', k2='v2')
    或
    mget({'k1': 'v1', 'k2': 'v2'})

get(name)

獲取值

mget(keys, *args)

批量獲取
如:
    mget('ylr', 'wupeiqi')
    或
    r.mget(['ylr', 'wupeiqi'])

getset(name, value)

設置新值並獲取原來的值

getrange(key, start, end)

獲取子序列(根據字節獲取,非字符)
參數:
    name,Redis 的 name
    start,起始位置(字節)
    end,結束位置(字節)
如: "魑魅魍魎" ,0-3表示 ""

setrange(name, offset, value)

修改字符串內容,從指定字符串索引開始向后替換(新值太長時,則向后添加)
參數:
    offset,字符串的索引,字節(一個漢字三個字節,一個字符一個字節)
    value,要設置的值

setbit(name, offset, value)

# 對name對應值的二進制表示的位進行操作
 
# 參數:
    # name,redis的name
    # offset,位的索引(將值變換成二進制后再進行索引)
    # value,值只能是 1 或 0
 
# 注:如果在Redis中有一個對應: n1 = "foo",
        那么字符串foo的二進制表示為:01100110 01101111 01101111
    所以,如果執行 setbit('n1', 7, 1),則就會將第7位設置為1,
        那么最終二進制則變成 01100111 01101111 01101111,即:"goo"
 
# 擴展,轉換二進制表示:
 
    # source = "武沛齊"
    source = "foo"
 
    for i in source:
        num = ord(i)
        print bin(num).replace('b','')
 
    特別的,如果source是漢字 "武沛齊"怎么辦?
    答:對於utf-8,每一個漢字占 3 個字節,那么 "武沛齊" 則有 9個字節
       對於漢字,for循環時候會按照 字節 迭代,那么在迭代時,將每一個字節轉換 十進制數,然后再將十進制數轉換成二進制
        11100110 10101101 10100110 11100110 10110010 10011011 11101001 10111101 10010000
        -------------------------- ----------------------------- -----------------------------
                    武                         沛                           齊

*用途舉例,用最省空間的方式,存儲在線用戶數及分別是哪些用戶在線

 

getbit(name, offset)

獲取name對應的值的二進制表示中的某位的值 (0或1)

bitcount(key, start=None, end=None)

獲取name對應的值的二進制表示中 1 的個數
參數:
    key,Redis的name
    start,位起始位置
    end,位結束位置

strlen(name)

返回name對應值的字節長度(一個漢字3個字節)

incr(self, name, amount=1)

自增 name對應的值,當name不存在時,則創建name=amount,否則,則自增。
 
參數:
     name,Redis的name
     amount,自增數(必須是整數)
 
 注:同incrby

incrbyfloat(self, name, amount=1.0)

自增 name對應的值,當name不存在時,則創建name=amount,否則,則自增。
 
參數:
    name,Redis的name
    amount,自增數(浮點型)

decr(self, name, amount=1)

自減 name對應的值,當name不存在時,則創建name=amount,否則,則自減。
 
參數:
    name,Redis的name
    amount,自減數(整數)

append(key, value)

在redis name對應的值后面追加內容
 
參數:
    key, redis的name
    value, 要追加的字符串

 2)、Hash操作

hash表現形式上有些像pyhton中的dict,可以存儲一組關聯性較強的數據 , redis中Hash在內存中的存儲格式如下圖:

hset(name, key, value)

name對應的hash中設置一個鍵值對(不存在,則創建;否則,修改)
 
參數:
    name,redis的name
    key,name對應的hash中的key
    value,name對應的hash中的value
 
 注:
    hsetnx(name, key, value),當name對應的hash中不存在當前key時則創建(相當於添加)

hmset(name, mapping)

在name對應的hash中批量設置鍵值對
 
參數:
    name,redis的name
    mapping,字典,如:{'k1':'v1', 'k2': 'v2'}
 
如:
    r.hmset('xx', {'k1':'v1', 'k2': 'v2'})

hget(name,key)

在name對應的hash中獲取根據key獲取value

hmget(name, keys, *args)

在name對應的hash中獲取多個key的值
 
參數:
    name,reids對應的name
    keys,要獲取key集合,如:['k1', 'k2', 'k3']
    *args,要獲取的key,如:k1,k2,k3
 
如:
    r.mget('xx', ['k1', 'k2'])
    或
    print r.hmget('xx', 'k1', 'k2')

hgetall(name)

獲取name對應hash的所有鍵值

hlen(name)

獲取name對應的hash中鍵值對的個數

hkeys(name)

獲取name對應的hash中所有的key的值

hvals(name)

獲取name對應的hash中所有的value的值

hexists(name, key)

檢查name對應的hash是否存在當前傳入的key

hdel(name,*keys)

將name對應的hash中指定key的鍵值對刪除

hincrby(name, key, amount=1)

自增name對應的hash中的指定key的值,不存在則創建key=amount
參數:
    name,redis中的name
    key, hash對應的key
    amount,自增數(整數)

hincrbyfloat(name, key, amount=1.0)

自增name對應的hash中的指定key的值,不存在則創建key=amount
 
參數:
    name,redis中的name
    key, hash對應的key
    amount,自增數(浮點數)
 
自增name對應的hash中的指定key的值,不存在則創建key=amount

hscan(name, cursor=0, match=None, count=None)

Start a full hash scan with:

HSCAN myhash 0

Start a hash scan with fields matching a pattern with:

HSCAN myhash 0 MATCH order_*

Start a hash scan with fields matching a pattern and forcing the scan command to do more scanning with:

HSCAN myhash 0 MATCH order_* COUNT 1000

增量式迭代獲取,對於數據大的數據非常有用,hscan可以實現分片的獲取數據,並非一次性將數據全部獲取完,從而防止內存被撐爆
 
參數:
    name,redis的name
    cursor,游標(基於游標分批取獲取數據)
    match,匹配指定key,默認None 表示所有的key
    count,每次分片最少獲取個數,默認None表示采用Redis的默認分片個數
 
如:
    第一次:cursor1, data1 = r.hscan('xx', cursor=0, match=None, count=None)
    第二次:cursor2, data1 = r.hscan('xx', cursor=cursor1, match=None, count=None)
    ...
    直到返回值cursor的值為0時,表示數據已經通過分片獲取完畢
 

hscan_iter(name, match=None, count=None)

利用yield封裝hscan創建生成器,實現分批去redis中獲取數據
  
參數:
    match,匹配指定key,默認None 表示所有的key
    count,每次分片最少獲取個數,默認None表示采用Redis的默認分片個數
  
如:
    for item in r.hscan_iter('xx'):
        print item

3)、list 

List操作,redis中的List在在內存中按照一個name對應一個List來存儲。如圖:

lpush(name,values)

在name對應的list中添加元素,每個新的元素都添加到列表的最左邊
 
如:
    r.lpush('oo', 11,22,33)
    保存順序為: 33,22,11
 
擴展:
    rpush(name, values) 表示從右向左操作

lpushx(name,value)

在name對應的list中添加元素,只有name已經存在時,值添加到列表的最左邊
 
更多:
    rpushx(name, value) 表示從右向左操作

llen(name)

 name對應的list元素的個數

linsert(name, where, refvalue, value))

在name對應的列表的某一個值前或后插入一個新值
 
參數:
    name,redis的name
    where,BEFORE或AFTER
    refvalue,標桿值,即:在它前后插入數據
    value,要插入的數據

r.lset(name, index, value)

對name對應的list中的某一個索引位置重新賦值
 
參數:
    name,redis的name
    index,list的索引位置
    value,要設置的值

r.lrem(name, value, num)

在name對應的list中刪除指定的值
 
參數:
    name,redis的name
    value,要刪除的值
    num,  num=0,刪除列表中所有的指定值;
           num=2,從前到后,刪除2個;
           num=-2,從后向前,刪除2個

lpop(name)

在name對應的列表的左側獲取第一個元素並在列表中移除,返回值則是第一個元素
 
更多:
    rpop(name) 表示從右向左操作

lindex(name, index)

在name對應的列表中根據索引獲取列表元素

lrange(name, start, end)

在name對應的列表分片獲取數據
參數:
    name,redis的name
    start,索引的起始位置
    end,索引結束位置

ltrim(name, start, end)

在name對應的列表中移除沒有在start-end索引之間的值
參數:
    name,redis的name
    start,索引的起始位置
    end,索引結束位置

rpoplpush(src, dst)

從一個列表取出最右邊的元素,同時將其添加至另一個列表的最左邊
參數:
    src,要取數據的列表的name
    dst,要添加數據的列表的name

blpop(keys, timeout)

將多個列表排列,按照從左到右去pop對應列表的元素
 
參數:
    keys,redis的name的集合
    timeout,超時時間,當元素所有列表的元素獲取完之后,阻塞等待列表內有數據的時間(秒), 0 表示永遠阻塞
 
更多:
    r.brpop(keys, timeout),從右向左獲取數據

brpoplpush(src, dst, timeout=0)

從一個列表的右側移除一個元素並將其添加到另一個列表的左側
 
參數:
    src,取出並要移除元素的列表對應的name
    dst,要插入元素的列表對應的name
    timeout,當src對應的列表中沒有數據時,阻塞等待其有數據的超時時間(秒),0 表示永遠阻塞 

4)、集合操作,Set集合就是不允許重復的列表。

sadd(name,values)

name對應的集合中添加元素

scard(name)

獲取name對應的集合中元素個數

sdiff(keys, *args)

在第一個name對應的集合中且不在其他name對應的集合的元素集合

sdiffstore(dest, keys, *args)

 獲取第一個name對應的集合中且不在其他name對應的集合,再將其新加入到dest對應的集合中

sinter(keys, *args)

獲取多一個name對應集合的並集

sinterstore(dest, keys, *args)

獲取多一個name對應集合的並集,再講其加入到dest對應的集合中

sismember(name, value)

檢查value是否是name對應的集合的成員

smembers(name)

獲取name對應的集合的所有成員

smove(src, dst, value)

將某個成員從一個集合中移動到另外一個集合

spop(name)

從集合的右側(尾部)移除一個成員,並將其返回

srandmember(name, numbers)

從name對應的集合中隨機獲取 numbers 個元素

srem(name, values)

 在name對應的集合中刪除某些值

sunion(keys, *args)

獲取多一個name對應的集合的並集

sunionstore(dest,keys, *args)

獲取多一個name對應的集合的並集,並將結果保存到dest對應的集合中

sscan(name, cursor=0, match=None, count=None)
sscan_iter(name, match=None, count=None)

同字符串的操作,用於增量迭代分批獲取元素,避免內存消耗太大

有序集合,在集合的基礎上,為每元素排序;元素的排序需要根據另外一個值來進行比較,所以,對於有序集合,每一個元素有兩個值,即:值和分數,分數專門用來做排序。

zadd(name, *args, **kwargs)

在name對應的有序集合中添加元素
如:
     zadd('zz', 'n1', 1, 'n2', 2)
     或
     zadd('zz', n1=11, n2=22)

zcard(name)

獲取name對應的有序集合元素的數量

zcount(name, min, max)

獲取name對應的有序集合中分數 在 [min,max] 之間的個數

zincrby(name, value, amount)

自增name對應的有序集合的 name 對應的分數

r.zrange( name, start, end, desc=False, withscores=False, score_cast_func=float)

按照索引范圍獲取name對應的有序集合的元素
 
參數:
    name,redis的name
    start,有序集合索引起始位置(非分數)
    end,有序集合索引結束位置(非分數)
    desc,排序規則,默認按照分數從小到大排序
    withscores,是否獲取元素的分數,默認只獲取元素的值
    score_cast_func,對分數進行數據轉換的函數
 
更多:
    從大到小排序
    zrevrange(name, start, end, withscores=False, score_cast_func=float)
 
    按照分數范圍獲取name對應的有序集合的元素
    zrangebyscore(name, min, max, start=None, num=None, withscores=False, score_cast_func=float)
    從大到小排序
    zrevrangebyscore(name, max, min, start=None, num=None, withscores=False, score_cast_func=float)

zrank(name, value)

獲取某個值在 name對應的有序集合中的排行(從 0 開始)
 
更多:
    zrevrank(name, value),從大到小排序

zrem(name, values)

刪除name對應的有序集合中值是values的成員
 
如:zrem('zz', ['s1', 's2'])

zremrangebyrank(name, min, max)

根據排行范圍刪除

zremrangebyscore(name, min, max)

根據分數范圍刪除

zscore(name, value)

 獲取name對應有序集合中 value 對應的分數

zinterstore(dest, keys, aggregate=None)

獲取兩個有序集合的交集,如果遇到相同值不同分數,則按照aggregate進行操作
aggregate的值為:  SUM  MIN  MAX

zunionstore(dest, keys, aggregate=None)

獲取兩個有序集合的並集,如果遇到相同值不同分數,則按照aggregate進行操作
aggregate的值為:  SUM  MIN  MAX

zscan(name, cursor=0, match=None, count=None, score_cast_func=float)
zscan_iter(name, match=None, count=None,score_cast_func=float)

同字符串相似,相較於字符串新增score_cast_func,用來對分數進行操作

 5、其他常用操作

delete(*names)

根據刪除redis中的任意數據類型

exists(name)

 檢測redis的name是否存在

keys(pattern='*')

根據模型獲取redis的name
 
更多:
    KEYS * 匹配數據庫中所有 key 。
    KEYS h?llo 匹配 hello , hallo 和 hxllo 等。
    KEYS h*llo 匹配 hllo 和 heeeeello 等。
    KEYS h[ae]llo 匹配 hello 和 hallo ,但不匹配 hillo

expire(name ,time)

為某個redis的某個name設置超時時間

rename(src, dst)

對redis的name重命名為

move(name, db))

將redis的某個值移動到指定的db下

randomkey()

 隨機獲取一個redis的name(不刪除)

type(name)

獲取name對應值的類型

scan(cursor=0, match=None, count=None)
scan_iter(match=None, count=None)

同字符串操作,用於增量迭代獲取key

 2、管道

redis-py默認在執行每次請求都會創建(連接池申請連接)和斷開(歸還連接池)一次連接操作,如果想要在一次請求中指定多個命令,則可以使用pipline實現一次請求指定多個命令,並且默認情況下一次pipline 是原子性操作。

import redis
pool = redis.ConnectionPool(host='192.168.71.3',port=6379,password='zedata')
r = redis.Redis(connection_pool=pool)
pipe = r.pipeline(transaction=True)
pipe.set('name','cheng')
pipe.set('age','24')
pipe.execute()

3、發布訂閱

轉發:

import redis


class RedisHelper:

    def __init__(self):
        self.__conn = redis.Redis(host='10.211.55.4')
        self.chan_sub = 'fm104.5'
        self.chan_pub = 'fm104.5'

    def public(self, msg):
        self.__conn.publish(self.chan_pub, msg)
        return True

    def subscribe(self):
        pub = self.__conn.pubsub()
        pub.subscribe(self.chan_sub)
        pub.parse_response()
        return pub

訂閱:

from monitor.RedisHelper import RedisHelper
 
obj = RedisHelper()
redis_sub = obj.subscribe()
 
while True:
    msg= redis_sub.parse_response()
    print msg

發布:

from monitor.RedisHelper import RedisHelper
 
obj = RedisHelper()
obj.public('hello')

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM