留存率是用於反映網站、互聯網應用或網絡游戲的運營情況的統計指標,其具體含義為在統計周期(周/月)內,每日活躍用戶數在第N日仍啟動該App的用戶數占比的平均值。其中N通常取2、4、8、15、31,分別對應次日留存率、三日留存率、周留存率、半月留存率和月留存率。
留存率常用於反映用戶粘性,當N取值越大、留存率越高時,用戶粘性越高。
公式
新增用戶留存率=新增用戶中登錄用戶數/新增用戶數*100%(一般統計周期為天)
新增用戶數:在某個時間段(一般為第一整天)新登錄應用的用戶數;
登錄用戶數:登錄應用后至當前時間,至少登錄過一次的用戶數;
第N日留存:指的是新增用戶日之后的第N日依然登錄的用戶占新增用戶的比例
第1日留存率(即“次留”):(當天新增的用戶中,新增日之后的第1天還登錄的用戶數)/第一天新增總用戶數;
第3日留存率:(當天新增的用戶中,新增日之后的第3天還登錄的用戶數)/第一天新增總用戶數;
第7日留存率:(當天新增的用戶中,新增日之后的第7天還登錄的用戶數)/第一天新增總用戶數;
第30日留存率:(當天新增的用戶中,新增日之后的第30天還登錄的用戶數)/第一天新增總用戶數;
- 偷個懶,算個“1日留存率”
分析
公式: 第1日留存率(即“次留”):(當天新增的用戶中,新增日之后的第1天還登錄的用戶數)/第一天新增總用戶數
從公式簡單推理,只需要拿到“前一天新增用戶數” 和 “前一天新增用戶數今日登錄數”
- 前一天新增用戶數: 要獲取新增的用戶數,必須要有全量的用戶列表,並且全量用戶列表中應該有每個用戶的注冊時間(任務開始時,需要昨天的用戶列表,計算當天的留存)
為了方便用戶列表放到 mysql 中,表結構如下:
create table user_info
(
id bigint auto_increment primary key,
user_id bigint null,
register_day varchar(10) null comment '注冊時間,格式yyyy-MM-dd',
create_time timestamp default CURRENT_TIMESTAMP null,
update_time timestamp default CURRENT_TIMESTAMP null
)
comment '用戶注冊時間';
user_info 表,全量用戶列表,有每個用戶的注冊時間,任務啟動時,加載全量用戶列表,並提取昨日注冊用戶
- 新增日之后的第1天還登錄的用戶數: 已經有了昨日注冊用戶,這部分用戶的登錄就很容易獲取
代碼
flink 最近這幾個版本用戶界面的變化也挺多的,寫個代碼做個記錄
任務 DAG 如下:
kafka source -> map 轉換 json 成對象 -> 提取 timestamp -> 提取 watermark -> key 所以數據到一個 process -> window -> trigger -> process 計算 -> kafka sink 輸出
Checkpoint 配置
// 每 1000ms 開始一次 checkpoint
env.enableCheckpointing(5 * 60 * 1000)
// 高級選項:
// 設置模式為精確一次 (這是默認值)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
// 確認 checkpoints 之間的時間會進行 500 ms
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(5 * 60 * 1000)
// Checkpoint 必須在一分鍾內完成,否則就會被拋棄
env.getCheckpointConfig.setCheckpointTimeout(10 * 60 * 1000)
// 允許兩個連續的 checkpoint 錯誤
env.getCheckpointConfig.setTolerableCheckpointFailureNumber(10)
// 同一時間只允許一個 checkpoint 進行
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
// 使用 externalized checkpoints,這樣 checkpoint 在作業取消后仍就會被保留
env.getCheckpointConfig.setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
// storage path
val checkpointStorage = new FileSystemCheckpointStorage(checkpointPath)
env.getCheckpointConfig.setCheckpointStorage(checkpointStorage)
// rocksdb
env.setStateBackend(new EmbeddedRocksDBStateBackend(true))
Kafka Source/Sink
val kafkaSource = KafkaSource
.builder[KafkaSimpleStringRecord]()
.setBootstrapServers(bootstrapServer)
.setGroupId("ra")
.setTopics(topic)
.setStartingOffsets(OffsetsInitializer.latest())
.setDeserializer(new SimpleKafkaRecordDeserializationSchema())
.build()
val kafkaSink = KafkaSink
.builder[String]()
.setBootstrapServers(bootstrapServer)
.setKafkaProducerConfig(Common.getProp)
.setRecordSerializer(KafkaRecordSerializationSchema.builder[String]()
.setTopic(sinkTopic)
.setKeySerializationSchema(new SimpleStringSchema())
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
.build()
Kafka Source 自定義了一個 反序列化器 解析 Kafka 數據,將 TopicPartition、Offset、Key、Value、timestamp 添加到對象
public class KafkaSimpleStringRecord implements Serializable {
private static final long serialVersionUID = 4813439951036021779L;
// kafka topic partition
private final TopicPartition tp;
// record kafka offset
private final long offset;
// record key
private final String key;
// record timestamp
private final long timestamp;
// record value
private final String value;
}
Timestamp & Watermark
// default is IngestionTime, kafka source will add timestamp to StreamRecord,
// if not set assignAscendingTimestamps, use StreamRecord' timestamp, so is ingestion time
.assignAscendingTimestamps(userLog => DateTimeUtil.parse(userLog.ts).getTime)
// create watermark by all elements
.assignTimestampsAndWatermarks(new WatermarkStrategy[UserLog] {
override def createWatermarkGenerator(context: WatermarkGeneratorSupplier.Context): WatermarkGenerator[UserLog] = {
new WatermarkGenerator[UserLog] {
var watermark: Watermark = new Watermark(Long.MinValue)
override def onEvent(event: UserLog, eventTimestamp: Long, output: WatermarkOutput): Unit = {
val timestamp = DateTimeUtil.parse(event.ts).getTime
watermark = new Watermark(timestamp - 1)
output.emitWatermark(watermark)
}
override def onPeriodicEmit(output: WatermarkOutput): Unit = {
output.emitWatermark(watermark)
}
}
}
})
從數據中提取 ts 字段轉為 Long 類型,作為數據的 timestamp
自定義 WatermarkGenerator,從每個輸入數據中提取 ts 字段,創建 watermark(Flink 內部會過濾 當前 watermark 小於上一個 watermark 的 watermark)
窗口
天的窗口和10 分鍾的 trigger
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
.trigger(ContinuousEventTimeTrigger.of(Time.seconds(10 * 60)))
process 計算
open: 創建 狀態對象
process: 從當日數據中計算昨日新增用戶進入登錄
clear: 每個窗口結束的時候,清空昨日新增用戶map,將今日新增用戶放入昨日新增用戶map和全部用戶map
open 方法
啟動時調用,創建狀態對象,連接 mysql,加載歷史用戶列表
/**
* open: load all user and last day new user
*
* @param parameters
*/
override def open(parameters: Configuration): Unit = {
LOG.info("RetentionAnalyzeProcessFunction open")
// create state
allUserState = getRuntimeContext.getState(new ValueStateDescriptor[util.HashMap[String, Int]]("allUser", classOf[util.HashMap[String, Int]]))
lastUserState = getRuntimeContext.getState(new ValueStateDescriptor[util.HashMap[String, Int]]("lastUser", classOf[util.HashMap[String, Int]]))
currentUserState = getRuntimeContext.getState(new ValueStateDescriptor[util.HashMap[String, Int]]("currentUser", classOf[util.HashMap[String, Int]]))
// connect mysql
reconnect()
// load history user
loadUser()
}
clear 方法
每個窗口結束的時候調用,用於處理窗口結束事件,在這里處理了一下 當天、昨日、全量用戶map, side 當日用戶
/**
* output current day new user
*
* @param context
*/
override def clear(context: Context): Unit = {
val window = context.window
LOG.info(String.format("window start : %s, end: %s, clear", DateTimeUtil.formatMillis(window.getStart, DateTimeUtil.YYYY_MM_DD_HH_MM_SS), DateTimeUtil.formatMillis(window.getEnd - 1, DateTimeUtil.YYYY_MM_DD_HH_MM_SS)))
// clear last user, add current user as last/all user map
lastUserMap.clear()
lastUserMap.putAll(currentUser)
allUserMap.putAll(currentUser)
lastUserState.update(lastUserMap)
allUserState.update(allUserMap)
// export current user to mysql userInfo
// exportCurrentUser(window)
val day = DateTimeUtil.formatMillis(window.getStart, DateTimeUtil.YYYY_MM_DD)
context.output(sideTag, (day, currentUser))
// currentUser.keySet().forEach(item => {
// context.output(sideTag, (day, item))
// })
// clear current user
currentUser.clear()
}
process 方法
創建一個 HashMap 用了存放昨日新增用戶進入的登錄情況,一個用戶可以多次登錄,所以用 map 重復的直接去掉
把不在 全量用戶map 里的 user_id 放到 當天用戶 map
// loop window element, find last user and current user
val it = elements.iterator
val lastUserLog = new util.HashMap[String, Int]
while (it.hasNext) {
val userLog = it.next()
if (lastUserMap.containsKey(userLog.userId)) {
lastUserLog.put(userLog.userId, 1)
}
if (!allUserMap.containsKey(userLog.userId)) {
currentUser.put(userLog.userId, 1)
}
}
如果昨日用戶map 為 null,留存率為 0,反之,用 “昨日新增用戶今日登錄數 / 昨日新增用戶數”(取 double)
var str: String = null
var retention: Double = 0
if (!lastUserMap.isEmpty) {
retention = lastUserLog.size().toDouble / lastUserMap.size()
}
str = day + "," + time + "," + allUserMap.size() + "," + lastUserMap.size() + "," + currentUser.size() + "," + retention
out.collect(str)
side 當日新增用戶
寫 mysql 有點慢,要比較長時間,就改用 side output 輸出,還是慢(是 mysql 的原因,可以先寫到 kafka,再轉到 mysql),就這樣了
// sink current day user to mysql, cost a lot time
val sideTag = new OutputTag[(String, util.HashMap[String, Int])]("side")
val jdbcSink = JdbcSink
.sink("insert into user_info(user_id, login_day) values(?, ?)", new JdbcStatementBuilder[(String, String)] {
override def accept(ps: PreparedStatement, element: (String, String)): Unit = {
ps.setString(1, element._2)
ps.setString(2, element._1)
}
}, JdbcExecutionOptions.builder()
.withBatchSize(100)
.withBatchIntervalMs(200)
.withMaxRetries(5)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:mysql://localhost:3306/venn?useUnicode=true&characterEncoding=utf8&useSSL=false&allowPublicKeyRetrieval=true")
.withDriverName("com.mysql.cj.jdbc.Driver")
.withUsername("root")
.withPassword("123456")
.build())
stream.getSideOutput(sideTag)
.flatMap(new RichFlatMapFunction[(String, util.HashMap[String, Int]), (String, String)]() {
override def flatMap(element: (String, util.HashMap[String, Int]), out: Collector[(String, String)]): Unit = {
val day = element._1
val keySet = element._2.keySet()
keySet.forEach(item => {
out.collect((day, item))
})
}
})
.addSink(jdbcSink)
.name("jdbcSink")
.uid("jdbcSink")
測試數據
{"category_id":22,"user_id":"257054","item_id":"22732","behavior":"pv","ts":"2022-04-12 22:56:51.000"}
{"category_id":83,"user_id":"506782","item_id":"83818","behavior":"pv","ts":"2022-04-12 22:57:01.000"}
{"category_id":15,"user_id":"196658","item_id":"15335","behavior":"pv","ts":"2022-04-12 22:57:11.000"}
{"category_id":78,"user_id":"715098","item_id":"78131","behavior":"pv","ts":"2022-04-12 22:57:21.000"}
{"category_id":0,"user_id":"374494","item_id":"0714","behavior":"pv","ts":"2022-04-12 22:57:31.000"}
{"category_id":41,"user_id":"651691","item_id":"41995","behavior":"fav","ts":"2022-04-12 22:57:41.000"}
{"category_id":55,"user_id":"725849","item_id":"55589","behavior":"pv","ts":"2022-04-12 22:57:51.000"}
注: 為了方便測試,在模擬數據中給每條數據都遞增 10 s,並且視所有數據都是用戶登錄
輸出結果
逗號分割,依次為: 日期,時間,全量用戶數,昨日新增用戶數,今日新增用戶數,留存率
第一天執行時無昨日數據,流程率為 0
后續執行就有留存率
2022-04-25,12:00:00,0,0,873,0.0
2022-04-25,12:10:00,0,0,933,0.0
2022-04-25,12:20:00,0,0,993,0.0
.....
2022-04-26,00:40:00,5170,5170,240,1.9342359767891682E-4
2022-04-26,00:50:00,5170,5170,300,1.9342359767891682E-4
2022-04-26,01:00:00,5170,5170,359,3.8684719535783365E-4
2022-04-26,01:10:00,5170,5170,419,3.8684719535783365E-4
每天的數據,隨着登錄的用戶越來越多,留存率會越來越大
- 注: 每個窗口結束時會發現任務有積壓,是寫 mysql 比較慢,導致積壓的,可以先寫 kafka 再轉 mysql
全部代碼參考 Github flink-rookie
歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文