統計在線用戶的數量,是應用很常見的需求了。如果需要精准的統計到用戶是在線,離線狀態,我想只有客戶端和服務器通過保持一個TCP長連接來實現。如果應用本身並非一個IM應用的話,這種方式成本極高。
現在的應用都趨向於使用心跳包來標識用戶是否在線。用戶登錄后,每隔一段時間,往服務器推送一個消息,表示當前用戶在線。服務器則可以定義一個時間差,例如:5分鍾內收到過客戶端心跳消息,視為在線用戶。
在線用戶統計的實現
基於數據庫實現
最簡單的辦法,就是在用戶表,添加一個最后心跳包的日期時間字段 last_active
。服務器收到心跳后,每次都去更新這個字段為當前的最新時間。
如果要查詢最近5分鍾活躍的用戶數量,就可以簡單的通過一句SQL完成。
SELECT COUNT(1) AS `online_user_count` FROM `user` WHERE `last_active` BETWEEN '2020-12-22 13:00:00' AND '020-12-22 13:05:00';
弊端也是顯而易見,為了提高檢索效率,不得不為last_active
字段添加索引,而因為心跳的更新,會導致頻繁的重新維護索引樹,效率極其低下。
基於Redis實現
這是比較理想的一種實現方式了,Redis基於內存進行讀寫,性能自然比關系型數據庫好得多,而且它所提供的Zset可以很方便的構建出一個在線用戶的統計服務。
Redis的Zset
這里不會涉及太多redis的東西,簡單說明以下zset
。它是一個有序的set
集合,集合中的每個元素由2個東西組成
- member 既然是集合,那么它便是集合中的元素,並且不能重復
- score 既然是有序的,它就是用於排序的權重字段
Zset的部分操作
添加元素
ZADD key score member [score member ...]
一次性添加一個或者多個元素到集合,如果member
已經存在則會使用當前score
進行覆蓋
統計所有的元素數量
ZCARD key
統計score值在min和max之間元素數量
ZCOUNT key min max
刪除score值在min和max之間的元素
ZREMRANGEBYSCORE key min max
一個示例
我打算,用一個zset
存儲我內心中編程語言的評分排名,這個key叫做lang
添加信息,返回新添加的元素個數
> zadd lang 999 php 10 java 9 go 8 python 7 javascript
"5"
查看添加的數量
> zcard lang
"5"
查看評分在8 - 10之間的元素個數,有3個
> zcount lang 8 10
"3"
刪除評分在8 - 1000的元素,返回刪除的個數
> ZREMRANGEBYSCORE lang 8 1000
"4"
在線用戶服務的實現
知道了zset
后,就可以實現一個在線用戶的統計服務了。
實現思路
客戶端每隔5分鍾發送一個心跳到服務器,服務器根據會話獲取到用戶的ID,作為zset
的member
存入zset
,score
便是當前收到心跳的時間戳,當同一個用戶第二次發送心跳的時候,就會更新他對應的score
值,由於更新是在內存,這個速度相當快。
zadd users 1608616915109 10000
需要統計出在線用戶的數量,本質上就是需要統計出,最近5分鍾有發送心跳的用戶,通過zcount
可以很輕松的統計出來。通過程序獲取到當前的時間戳,作為maxScore
,時間戳減去5分鍾后作為minScore
。
zcount users 1608616615109 1608616915109
因為某些用戶可能長時間沒有登錄過了,可以通過ZREMRANGEBYSCORE
進行清理。通過程序獲取到當前的時間戳,減去5分鍾后作為maxScore
,使用0
, 作為minScore
,表示清理所有超過5分鍾沒有發送過心跳包的用戶。
ZREMRANGEBYSCORE users 0 1608616615109
實現代碼
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import javax.annotation.Resource;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
/**
*
*
* 在線用戶統計
*
* @author Administrator
*
*/
@Component
public class OnlineUserStatsService {
private static final String ONLINE_USERS = "onlie_users";
@Resource
private StringRedisTemplate stringRedisTemplate;
/**
* 添加用戶在線信息
* @param userId
* @return
*/
public Boolean online(Integer userId) {
return this.stringRedisTemplate.opsForZSet().add(ONLINE_USERS, userId.toString(), Instant.now().toEpochMilli());
}
/**
* 獲取一定時間內,在線的用戶數量
* @param duration
* @return
*/
public Long count(Duration duration) {
LocalDateTime now = LocalDateTime.now();
return this.stringRedisTemplate.opsForZSet().count(ONLINE_USERS,
now.minus(duration).atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(),
now.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli());
}
/**
* 獲取所有在線過的用戶數量,不論時間
* @return
*/
public Long count() {
return this.stringRedisTemplate.opsForZSet().zCard(ONLINE_USERS);
}
/**
* 清除超過一定時間沒在線的用戶數據
* @param duration
* @return
*/
public Long clear(Duration duration) {
return this.stringRedisTemplate.opsForZSet().removeRangeByScore(ONLINE_USERS, 0,
LocalDateTime.now().minus(duration).atZone(ZoneId.systemDefault()).toInstant().toEpochMilli());
}
}
使用示例
@Resource
private OnlineUserStatsService onlineUserStatsService;
@Test
public void test() {
// ID為1的用戶發送了心跳包
boolean result = this.onlineUserStatsService.online(1);
System.out.println("online=" + result);
// 獲取5分鍾內,發送過心跳包的用戶數量,也就是在線用戶的數量
Long count = this.onlineUserStatsService.count(Duration.ofMinutes(5));
System.out.println("oneline count=" + count);
// 獲取所有發送過心跳包的用戶數量
count = this.onlineUserStatsService.count();
System.out.println("all count=" + count);
// 清除超過1天都沒發送過心跳包的用戶
Long clear = this.onlineUserStatsService.clear(Duration.ofDays(1));
System.out.println("clear=" + clear);
}
內存消耗分析
可以通過 http://www.redis.cn/redis_memory/ 預算Redis的內存消耗
我對Redis的內存分配並不熟悉,只是按照自己的想法去填寫了一些數據,所以我在這里理解的東西,可能是錯誤的。但是我想這並不耽誤證明 - 在這種場景使用Zset對內存消耗極低的事實
設想onlie_users
需要存儲1億個用戶的狀態信息,每個元素score
和member
需要10個字節存儲,那么一共大約需要20G內存。20G的內存對於現在的服務器來說,並不是大問題。
最后
- 心跳協議不一定非要HTTP,如果客戶端支持的話UDP就很適合,可以節約一些系統開銷。
zset
的key,不一定非要用String
,可以修改序列化方式,以固定的字節的形式存儲用戶ID,在用戶ID過大的時候,可以節約一些存儲空間。
String userId = "10010";
System.out.println(userId.getBytes().length); // 以字符串形式存儲 => 需要5個字節
byte[] bin = ByteBuffer.allocate(4).putInt(Integer.valueOf(userId)).array();
System.out.println(bin.length); // 序列化為字節形式存儲 => 需要4個字節
System.out.println(ByteBuffer.wrap(bin).getInt()); // 反序列化為ID => 10010