日活百萬以內,統計用戶新增活躍留存方案


# 背景

產品上線后,出於運營的需要,我們要對用戶進行跟蹤,分析用戶數據。本文要介紹的是如何統計用戶新增數、活躍數和留存率,時間跨度是天,即統計每日新增(DNU),日活(DAU)和某日新增的一批用戶在接下來的一段時間內每天活躍的百分比。

 

# 使用范圍

本方案適用於用戶量不太大(日活在百萬以內,日活百萬以上不是不能用,只是在統計數據時耗時太長不太合適),尤其適合小團隊或個人開發者(比如你公司服務端接口開發是你,運維也是你,現在老板又來叫你做數據報表)。如果你的產品的日活有幾百萬甚至幾千萬或過億,這樣的產品當然是完全可以養一個大數據部門的,本方案並不適用這種情況。

 

# 涉及到的工具和技術點

shell腳本

本方案需要你懂一點兒shell,起碼能看懂,也要求你知道怎么寫crontab定時任務。

MySQL

本方案需要你熟練使用sql,知道怎么定義存儲過程,知道分區表的概念和用法。

 

# 實現過程

1.目標

由於數據量是不斷增加的,所以我們的目標是要把原始數據聚合成一張可以直接用一條select語句就可以查看每日新增、日活和留存率的表,並且只能做單表查詢,否則當數據量增大時,聯表查詢的速度會大大下降。而且為了防止出錯,我們的數據還需要可以重跑但是不會影響到已存在的數據。

最終呈現給運營人員看到的數據是這樣的:

新增-活躍表

用戶留存表

簡單解釋一下上面兩個表的結構:因為我們是按天統計的,所以日期都是以天為單位,用戶可能有不同的國家或地區,不同版本,不同手機型號等等,所以就有了各個維度。用戶留存表的數據要注意,比如今天是2022年2月9日,那么就只能統計到9號的新增,9號新增用戶的日次留存是10號才能統計到的,但是8號新增用戶的次日留存在今天(也就是9號)就統計出來了,所以留存的數據是一個階梯形狀的。

 

2.收集數據

為了方便介紹本方案,這里假設只有日期、國家、版本號三個維度。

收集數據的下一步是數據入庫,為了方便,需要把數據格式進行轉換。因為服務端接口現在一般都是使用json格式的數據進行通信,如果直接把json格式的數據輸出到日志文件,處理起來會非常麻煩,所以需要在服務端接收到統計日志時,把數據輸出到單獨的日志文件中,還要按照MySQL的load命令可以識別的數據格式。

 

在輸出日志之前,先確定好都需要哪些數據,這里需要的數據如下:

  • ts:timestamp,時間戳。服務端接收到日志的時間,格式是yyyy-MM-DD HH:mm:ss。

  • device_id:設備id,這里是用來唯一標識用戶的一個字符串,比如在android設備上可以用android id,總之這個字段是用來確定一台設備的,要保證不同的設備設備id不同。

  • country:用戶所在的國家。如果你是只做一個國家的,比如只做國內市場,也可以把這個字段換成省份或者城市,總之根據運營需求去改變。

  • version:應用版本號,一般是一個整數。

 

於是就可以確定日志的格式如下:

2022-02-09 13:14:15||aaaaaa||CN||100
2022-02-09 13:14:16||bbbbbb||US||100
2022-02-09 13:14:17||cccccc||NL||100

也就是一條數據占一行,字段之間使用雙豎線分隔,當然這里不一定是雙豎線分隔,也可以換成其它的,原則是字符數少而且不能被字段的值包含,不然在數據入庫時會出現字段不對應的問題。

再考慮兩個方面:

  • 如果數據量較大要怎么處理?

  • 可能有的字段的長度沒法一下子確定怎么處理?

  • 保留數據的策略應該怎樣設置?

第1個問題,當數據量大時,可以考慮把日志文件切割成更小的時間段,比如每小時一個日志文件,然后下一小時就把上一個小時的數據入庫。

第2個問題,原始數據表的字段長度定義得大一些,做到即使以后字段有變化,也可以適應。

第3個問題,因為我們的目標是跑出最后的報表,所以不可能一直保存着所有的原始日志數據,為了防止出錯,可能只是保留最近幾天的,一個簡單的策略是在每次日志數據入庫前用delete語句把前幾天的數據刪除了,但是直接使用delete有兩個問題:一是MySQL要掃描全表刪除數據,比較耗時;二是MySQL的delete + where刪除可能只是假刪除,磁盤不會立即釋放。

所以這里使用分區表來實現,每天的數據作為一個分區,刪除數據時直接刪除分區,數據入庫時先創建分區。

於是得到原始數據表的DDL如下:

CREATE TABLE `st_base` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 'ID',
  `dd` int(11) NOT NULL DEFAULT '0' COMMENT '天數,格式是yyyyMMddHH',
  `ts` varchar(64) COLLATE utf8_bin NOT NULL DEFAULT '' COMMENT '時間戳',
 `device_id` varchar(64) COLLATE utf8_bin NOT NULL DEFAULT '' COMMENT '設備id',
 `country` varchar(64) COLLATE utf8_bin NOT NULL DEFAULT '' COMMENT '國家',
 `version` varchar(64) COLLATE utf8_bin NOT NULL DEFAULT '' COMMENT '版本號',
 PRIMARY KEY (`id`,`dd`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin COMMENT='原始數據表'
/*!50100 PARTITION BY LIST (dd)
(PARTITION p20220209 VALUES IN (20220209) ENGINE = InnoDB) */

3.數據入庫

 

有了格式化的日志文件和數據表,就可以通過shell腳本把數據入庫了。步驟如下:

 

  • 刪除歷史日志的分區

  • 刪除執行日期的分區(這一步在重跑數據很有用)

  • 創建執行日期的分區

  • 使用MySQL的load命令把數據從日志文件加載到數據庫中

這里只說一下重要的命令:

 

1,刪除和創建分區可以分別使用下面兩個命令:

drop_sql="alter table st_base drop partition pxxxxxxxx" # 這里的xxxxxxxx要根據執行日期轉換一下
add_sql="alter table st_base add partition (partition pxxxxxxxx values in (xxxxxxxx) engine=innodb)"

mysql -u${username} -p${password} -D${database} -e "${drop_sql}"

mysql -u${username} -p${password} -D${database} -e "${add_sql}"

上面使用mysql命令指定了用戶名、密碼、數據庫名和sql語句(-e參數)

 

2.從文件加載數據入庫

log_file=xxxx #日志文件名
dd=xxxxxxxx #執行日期
load_sql="load data local infile '${log_file}' ignore into table st_base fields terminated by '||' lines terminated by '\n' (ts,device_id,country,version) set dd='${dd}'"
mysql -u${username} -p${password} -D${database} -e "${load_sql}"

3.定時任務

因為我們是每天入庫一次,所以可以在每天的0時10分去跑上面的腳本任務。假設上面的腳本文件保存為st_base.sh

可以通過crontab -e編輯定時任務:

10 0 * * * /path/to/job/st_base.sh

當然最好的做法是把執行日期當做腳本的參數傳入,這樣可以實現重跑某天的數據了。

 

4.清洗數據

在上一步得到了原始數據之后,接下來的工作都可以在MySQL中完成,首先要清洗數據。

這一步的目的有兩個:

  • 確定好數據類型

  • 數據去重

先創建一個臨時表tmp_base,這個表用來轉換數據類型,如果有一些字段的值需要轉換的也可以在這里做(舉個例子:假如客戶端獲取到的國家有幾種途徑,分別是獲取了sim卡國家,網絡國家,手機國家,到了服務端后服務器根據客戶端的ip也解析出了一個國家,但是運營的時候可能只需要一個最接近用戶的真實國家,那么就可以按照優先級來確定,當然本文沒有多個國家的問題),DDL如下:

CREATE TABLE `tmp_base` (
    `id` INT ( 11 ) NOT NULL AUTO_INCREMENT COMMENT 'ID',
    `dt` date NOT NULL COMMENT '日期',
    `device_id` VARCHAR ( 32 ) COLLATE utf8_bin NOT NULL DEFAULT '' COMMENT '設備id',
    `country` VARCHAR ( 8 ) COLLATE utf8_bin DEFAULT NULL,
    `version` INT ( 11 ) NOT NULL DEFAULT '0' COMMENT '版本號',
PRIMARY KEY ( `id` ) 
) ENGINE = INNODB DEFAULT CHARSET = utf8 COLLATE = utf8_bin COMMENT = '用戶基礎臨時表'

再創建一個用戶總表total_base,這個表用來存放所有用戶的數據,每個用戶只有一條數據,DDL如下:

CREATE TABLE `total_base` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 'ID',
  `dt` date NOT NULL COMMENT '新增日期',
  `device_id` varchar(32) COLLATE utf8_bin NOT NULL DEFAULT '' COMMENT '設備id',
 `country` varchar(8) COLLATE utf8_bin DEFAULT NULL,
  `version` int(11) NOT NULL DEFAULT '0' COMMENT '版本號',
  PRIMARY KEY (`id`),
  UNIQUE KEY `device` (`device_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin COMMENT='用戶總表';

創建一個流水表flow_base,同樣以日期作為分區字段,DDL如下:

CREATE TABLE `flow_base` (
    `id` BIGINT ( 20 ) NOT NULL AUTO_INCREMENT COMMENT 'ID',
    `dt` date NOT NULL DEFAULT '2022-01-01' COMMENT '日期',
    `device_id` VARCHAR ( 32 ) COLLATE utf8_bin NOT NULL DEFAULT '' COMMENT '設備id',
    `country` VARCHAR ( 8 ) COLLATE utf8_bin DEFAULT NULL,
    `version` INT ( 11 ) NOT NULL DEFAULT '0' COMMENT '版本號',
    `rdt` date NOT NULL DEFAULT '2022-01-01' COMMENT '用戶注冊日期',
    `dd` INT ( 11 ) NOT NULL DEFAULT '0' COMMENT '日期(yyyyMMdd),用來做分區',
    PRIMARY KEY ( `id`, `dd` ),
UNIQUE KEY `unique` ( `dt`, `device_id`, `dd` ) 
) ENGINE = INNODB CHARSET = utf8 COLLATE = utf8_bin COMMENT = '用戶基礎流水表' 
/*!50100 PARTITION BY LIST ( dd ) ( PARTITION p20220209 VALUES IN ( 20220209 ) ENGINE = InnoDB ) */

注意到流水表flow_base中有一個rdt的字段,這字段是用來存放這個用戶的注冊日期,方便后面統計留存使用的。

准備好表結構之后,開始清洗數據。清洗數據使用MySQL的存儲過程功能,創建一個存儲過程sp_data_cleaning,在這個存儲過程中,需要做以下幾件事:

  • 把原始數據表st_base中的數據清洗到臨時表tmp_base,如果有字段的值需要轉換也在這一步做。

  • 把臨時表tmp_base中的用戶添加到用戶總表total_base中。

  • 把臨時表tmp_base中的數據添加到流水表中,並且聯合用戶總表,把用戶的注冊日期也填充好。

於是可以得到存儲過程sp_data_cleaning的DDL如下:

CREATE PROCEDURE `sp_data_cleaning`(IN v_dt VARCHAR(10))
BEGIN
 # 變量
 declare pname varchar(10);
 declare v_is_pname_exists int;
 
 # 清除tmp_base表數據
    truncate table tmp_base;


 # 清洗數據
  insert into tmp_base(
   `dt`,
    `device_id`,
   `country`,
    `version`
   )
  select
   v_dt,
   `device_id`,
    `country`,
    `version`
   from `st_base`
   where `dd` = replace(v_dt,'-','');
   
 # 數據加入用戶總表
  insert ignore into total_base(
   `dt`,
   `device_id`,
   `country`,
   `version`
   )
  select
   `dt`,
   `device_id`,
   `country`,
   `version`
  from tmp_base;
  
 # 給流水表創建分區
 select concat('p', replace(v_dt, '-', '')) into pname;
 
 # 查找是否已經存在執行日期的分區
 select max(a) into v_is_pname_exists
 from (
  select 1 as a from information_schema.PARTITIONS 
  where `TABLE_SCHEMA` = 'your_database_name' 
   and `TABLE_NAME` = 'flow_base'
   and `PARTITION_NAME`=pname
  union all
  select 0 
  ) t;
 
 # 如果已經存在先刪除
 if v_is_pname_exists=1 then
 set @drop_sql=concat('alter table flow_base drop partition ', pname);
 prepare stmt from @drop_sql;
 execute stmt;
 deallocate prepare stmt;
 end if;


 # 創建分區
 set @add_sql=concat('alter table flow_base add partition (partition ', pname, ' values in (', v_date, ') ENGINE = InnoDB)');
 prepare stmt from @add_sql;
 execute stmt;
 deallocate prepare stmt;  


 # 數據加入流水表
  insert ignore into flow_base(
   `dt`,
   `device_id`,
   `country`,
   `version`,
   `rdt`,
   `dd`
   )
  select
   v_dt,
   t1.`device_id`,
   t1.`country`,
   t1.`version`,
   t2.`dt`,
   replace(v_dt, '-', '')
  from tmp_base t1
  left outer join total_base t2
  on (t1.`device_id`=t2.`device_id`);
END

五、數據聚合

經過上面幾個步驟的處理,現在已經得到了半成品的數據,可以進行聚合了。根據第一步的目標報表,可以確定兩個表的結構:一個是用戶的新增-活躍表,另一個是用戶的留存表,DDL如下:

新增-活躍表:

CREATE TABLE `rpt_base_active` (
    `id` BIGINT ( 20 ) NOT NULL AUTO_INCREMENT COMMENT 'ID',
    `dt` date NOT NULL DEFAULT '2022-01-01' COMMENT '日期',
    `country` VARCHAR ( 8 ) COLLATE utf8_bin DEFAULT NULL,
    `version` INT ( 11 ) NOT NULL DEFAULT '0' COMMENT '版本號',
    `new_users` SMALLINT ( 4 ) NOT NULL DEFAULT '0' COMMENT '新增數',
    `active_users` SMALLINT ( 4 ) NOT NULL DEFAULT '0' COMMENT '活躍數',
    PRIMARY KEY ( `id` ),
KEY `index1` ( `dt` ),
KEY `index3` ( `country`, `version` )) ENGINE = INNODB DEFAULT CHARSET = utf8 COLLATE = utf8_bin COMMENT = '用戶新增活躍表'

用戶留存表(這里假設只看7天的留存情況,如果需要看更多留存天數,可以自行修改):

CREATE TABLE `rpt_base` (
    `id` BIGINT ( 20 ) NOT NULL AUTO_INCREMENT COMMENT 'ID',
    `dt` date NOT NULL DEFAULT '2022-01-01' COMMENT '日期',
    `country` VARCHAR ( 8 ) COLLATE utf8_bin DEFAULT NULL,
    `version` INT ( 11 ) NOT NULL DEFAULT '0' COMMENT '版本號',
    `d0` SMALLINT ( 4 ) NOT NULL DEFAULT '0' COMMENT '新增數',
    `d1` SMALLINT ( 4 ) NOT NULL DEFAULT '0' COMMENT '次日留存數',
    `d2` SMALLINT ( 4 ) NOT NULL DEFAULT '0' COMMENT '2日留存數',
    `d3` SMALLINT ( 4 ) NOT NULL DEFAULT '0' COMMENT '3日留存數',
    `d4` SMALLINT ( 4 ) NOT NULL DEFAULT '0' COMMENT '4日留存數',
    `d5` SMALLINT ( 4 ) NOT NULL DEFAULT '0' COMMENT '5日留存數',
    `d6` SMALLINT ( 4 ) NOT NULL DEFAULT '0' COMMENT '6日留存數',
    `d7` SMALLINT ( 4 ) NOT NULL DEFAULT '0' COMMENT '7日留存數',
    PRIMARY KEY ( `id` ),
    KEY `index1` ( `dt` ),
KEY `index3` ( `country`, `version` ) 
) ENGINE = INNODB DEFAULT CHARSET = utf8 COLLATE = utf8_bin COMMENT = '用戶留存表'

注意,以上兩個表的索引創建並不是固定的,需要根據運營的實際情況去創建相關的索引。

在跑數據之前,先聚合一下執行日期的數據,創建一個臨時表a_flow_base,這個表用來初步聚合數據,DDL如下:

CREATE TABLE `a_flow_base` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 'ID',
  `dt` date NOT NULL DEFAULT '2022-01-01' COMMENT '日期',
 `country` varchar(8) COLLATE utf8_bin DEFAULT NULL,
  `version` int(11) NOT NULL DEFAULT '0' COMMENT '應用版本號',
  `rdt` date NOT NULL DEFAULT '2022-01-01' COMMENT '用戶注冊日期',
  `rdays` smallint(4) NOT NULL DEFAULT '0' COMMENT '留存天數',
  `users` smallint(4) NOT NULL DEFAULT '0' COMMENT '用戶數',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin COMMENT='用戶基礎數據聚合表'

首先初步聚合用戶數據,創建一個存儲過程sp_a_flow_base,DDL如下:

CREATE PROCEDURE `sp_a_flow_base`(in v_dt char(10))
BEGIN
 declare d0 date;
 declare d1 date;
 declare d2 date;
 declare d3 date;
 declare d4 date;
 declare d5 date;
 declare d6 date;
 declare d7 date;
 
 select date_sub(v_dt, interval 0 day) into d0;
 select date_sub(v_dt, interval 1 day) into d1;
 select date_sub(v_dt, interval 2 day) into d2;
 select date_sub(v_dt, interval 3 day) into d3;
 select date_sub(v_dt, interval 4 day) into d4;
 select date_sub(v_dt, interval 5 day) into d5;
 select date_sub(v_dt, interval 6 day) into d6;
 select date_sub(v_dt, interval 7 day) into d7;
 
 # 清除a_flow_base表數據
 truncate table a_flow_base;
 
 insert into a_flow_base(
  `dt`,
  `country`,
   `version_code`,
   `rdt`,
   `rdays`,
   `users`
  )
 select 
  t1.`dt`,
   t1.`country`,
   t1.`version`,
   t1.`rdt`,
   datediff(t1.`dt`, t1.`rdt`) as rdays,
   count(*) as users
 from flow_base t1
 where t1.`dt` in (d0,d1,d2,d3,d4,d5,d6,d7)
 group by 
  t1.`dt`,
   t1.`country`,
   t1.`version`,
   t1.`rdt`;
END

初步聚合了數據后,開始正式聚合數據,創建一個存儲過程sp_rpt_base,DDL如下:

CREATE PROCEDURE `sp_rpt_base`(in v_dt char(10))
BEGIN
 declare d0 date;
 declare d1 date;
 declare d2 date;
 declare d3 date;
 declare d4 date;
 declare d5 date;
 declare d6 date;
 declare d7 date;
 
 select date_sub(v_dt, interval 0 day) into d0;
 select date_sub(v_dt, interval 1 day) into d1;
 select date_sub(v_dt, interval 2 day) into d2;
 select date_sub(v_dt, interval 3 day) into d3;
 select date_sub(v_dt, interval 4 day) into d4;
 select date_sub(v_dt, interval 5 day) into d5;
 select date_sub(v_dt, interval 6 day) into d6;
 select date_sub(v_dt, interval 7 day) into d7;
 
 # 刪除數據
 delete from rpt_base_active where `dt` = v_dt;
 
 insert into rpt_base_active (
  `dt`,
  `country`,
   `version`,
   `new_users`,
   `active_users`
  )
 select
  `dt`,
  `country`,
  `version`,
  sum(if(`dt`=`rdt`, 1, 0)) as `new_users`,
  sum(1) as `active_users`
 from flow_base
 where dt=v_dt
 group by 
  `dt`,
  `country`,
  `version`
  ;


 # 刪除數據
 delete from rpt_base where `dt` in (d0,d1,d2,d3,d4,d5,d6,d7);
 
 insert into rpt_base(
  `dt`,
  `country`,
   `version`,
   `d0`,
   `d1`,
   `d2`,
   `d3`,
   `d4`,
   `d5`,
   `d6`,
   `d7`
  )
 select
  t1.`rdt`,
  t1.`country`,
  t1.`version`,
  sum(case when t1.`rdays`=0 then t1.`users` else 0 end) as d0,
  sum(case when t1.`rdays`=1 then t1.`users` else 0 end) as d1,
  sum(case when t1.`rdays`=2 then t1.`users` else 0 end) as d2,
  sum(case when t1.`rdays`=3 then t1.`users` else 0 end) as d3,
  sum(case when t1.`rdays`=4 then t1.`users` else 0 end) as d4,
  sum(case when t1.`rdays`=5 then t1.`users` else 0 end) as d5,
  sum(case when t1.`rdays`=6 then t1.`users` else 0 end) as d6,
  sum(case when t1.`rdays`=7 then t1.`users` else 0 end) as d7
  from a_flow_base t1
  group by
   t1.`rdt`,
   t1.`country`,
    t1.`version`
   ;
   
END

為了方便調用整個過程,可以再創建一個存儲過程,把全過程寫在一起,一次執行。創建一個存儲過程sp_user,DDL如下:

CREATE PROCEDURE `sp_user`(in v_dt char(10))
BEGIN
 call sp_tmp_base(v_dt);
 call sp_data_cleaning(v_dt);
 call sp_a_flow_base(v_dt);
 call sp_rpt_base(v_dt);
 
END

這樣,就可以添加定時任務每天定時跑前一天的數據了。

 

# 寫在最后

流水表flow_base應該保留幾天的數據?

這個看你的用戶留存表需要看多少天留存數據,如果你要看7日留存,那么保留最近8天的數據,如果是想看30天留存,就保留最近31天的數據,依次類推。

如果運營人員或老板9點半上班,每天凌晨的0點開始跑前一天的數據,你將有9個半小時來跑前一天的數據。當然如果一天的數據要跑2個小時以上,還是考慮用Hadoop來做吧。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM