Flink 實時計算留存率


留存率是用於反映網站、互聯網應用或網絡游戲的運營情況的統計指標,其具體含義為在統計周期(周/月)內,每日活躍用戶數在第N日仍啟動該App的用戶數占比的平均值。其中N通常取2、4、8、15、31,分別對應次日留存率、三日留存率、周留存率、半月留存率和月留存率。

留存率常用於反映用戶粘性,當N取值越大、留存率越高時,用戶粘性越高。

公式


新增用戶留存率=新增用戶中登錄用戶數/新增用戶數*100%(一般統計周期為天)

新增用戶數:在某個時間段(一般為第一整天)新登錄應用的用戶數;

登錄用戶數:登錄應用后至當前時間,至少登錄過一次的用戶數;

第N日留存:指的是新增用戶日之后的第N日依然登錄的用戶占新增用戶的比例

第1日留存率(即“次留”):(當天新增的用戶中,新增日之后的第1天還登錄的用戶數)/第一天新增總用戶數;

第3日留存率:(當天新增的用戶中,新增日之后的第3天還登錄的用戶數)/第一天新增總用戶數;

第7日留存率:(當天新增的用戶中,新增日之后的第7天還登錄的用戶數)/第一天新增總用戶數;

第30日留存率:(當天新增的用戶中,新增日之后的第30天還登錄的用戶數)/第一天新增總用戶數;

  • 偷個懶,算個“1日留存率”

分析

公式: 第1日留存率(即“次留”):(當天新增的用戶中,新增日之后的第1天還登錄的用戶數)/第一天新增總用戶數

從公式簡單推理,只需要拿到“前一天新增用戶數” 和 “前一天新增用戶數今日登錄數”

  • 前一天新增用戶數: 要獲取新增的用戶數,必須要有全量的用戶列表,並且全量用戶列表中應該有每個用戶的注冊時間(任務開始時,需要昨天的用戶列表,計算當天的留存)

為了方便用戶列表放到 mysql 中,表結構如下:

create table user_info
(
	id bigint auto_increment primary key,
	user_id bigint null,
	register_day varchar(10) null comment '注冊時間,格式yyyy-MM-dd',
	create_time timestamp default CURRENT_TIMESTAMP null,
	update_time timestamp default CURRENT_TIMESTAMP null
)
comment '用戶注冊時間';

user_info 表,全量用戶列表,有每個用戶的注冊時間,任務啟動時,加載全量用戶列表,並提取昨日注冊用戶

  • 新增日之后的第1天還登錄的用戶數: 已經有了昨日注冊用戶,這部分用戶的登錄就很容易獲取

代碼

flink 最近這幾個版本用戶界面的變化也挺多的,寫個代碼做個記錄

任務 DAG 如下:

kafka source -> map 轉換 json 成對象 -> 提取 timestamp -> 提取 watermark -> key 所以數據到一個 process -> window -> trigger -> process 計算 -> kafka sink 輸出

Checkpoint 配置


// 每 1000ms 開始一次 checkpoint
env.enableCheckpointing(5 * 60 * 1000)
// 高級選項:
// 設置模式為精確一次 (這是默認值)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)

// 確認 checkpoints 之間的時間會進行 500 ms
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(5 * 60 * 1000)

// Checkpoint 必須在一分鍾內完成,否則就會被拋棄
env.getCheckpointConfig.setCheckpointTimeout(10 * 60 * 1000)

// 允許兩個連續的 checkpoint 錯誤
env.getCheckpointConfig.setTolerableCheckpointFailureNumber(10)

// 同一時間只允許一個 checkpoint 進行
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)

// 使用 externalized checkpoints,這樣 checkpoint 在作業取消后仍就會被保留
env.getCheckpointConfig.setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)

// storage path
val checkpointStorage = new FileSystemCheckpointStorage(checkpointPath)
env.getCheckpointConfig.setCheckpointStorage(checkpointStorage)
// rocksdb
env.setStateBackend(new EmbeddedRocksDBStateBackend(true))

Kafka Source/Sink


val kafkaSource = KafkaSource
      .builder[KafkaSimpleStringRecord]()
      .setBootstrapServers(bootstrapServer)
      .setGroupId("ra")
      .setTopics(topic)
      .setStartingOffsets(OffsetsInitializer.latest())
      .setDeserializer(new SimpleKafkaRecordDeserializationSchema())
      .build()

val kafkaSink = KafkaSink
      .builder[String]()
      .setBootstrapServers(bootstrapServer)
      .setKafkaProducerConfig(Common.getProp)
      .setRecordSerializer(KafkaRecordSerializationSchema.builder[String]()
        .setTopic(sinkTopic)
        .setKeySerializationSchema(new SimpleStringSchema())
        .setValueSerializationSchema(new SimpleStringSchema())
        .build()
      )
      .build()      

Kafka Source 自定義了一個 反序列化器 解析 Kafka 數據,將 TopicPartition、Offset、Key、Value、timestamp 添加到對象

public class KafkaSimpleStringRecord implements Serializable {
    private static final long serialVersionUID = 4813439951036021779L;
    // kafka topic partition
    private final TopicPartition tp;
    // record kafka offset
    private final long offset;
    // record key
    private final String key;
    // record timestamp
    private final long timestamp;
    // record value
    private final String value;
}


Timestamp & Watermark


// default is IngestionTime, kafka source will add timestamp to StreamRecord,
// if not set assignAscendingTimestamps, use StreamRecord' timestamp, so is ingestion time
.assignAscendingTimestamps(userLog => DateTimeUtil.parse(userLog.ts).getTime)
// create watermark by all elements
.assignTimestampsAndWatermarks(new WatermarkStrategy[UserLog] {
override def createWatermarkGenerator(context: WatermarkGeneratorSupplier.Context): WatermarkGenerator[UserLog] = {
  new WatermarkGenerator[UserLog] {
    var watermark: Watermark = new Watermark(Long.MinValue)

    override def onEvent(event: UserLog, eventTimestamp: Long, output: WatermarkOutput): Unit = {
      val timestamp = DateTimeUtil.parse(event.ts).getTime
      watermark = new Watermark(timestamp - 1)
      output.emitWatermark(watermark)
    }

    override def onPeriodicEmit(output: WatermarkOutput): Unit = {
      output.emitWatermark(watermark)
    }
  }
}
})

從數據中提取 ts 字段轉為 Long 類型,作為數據的 timestamp
自定義 WatermarkGenerator,從每個輸入數據中提取 ts 字段,創建 watermark(Flink 內部會過濾 當前 watermark 小於上一個 watermark 的 watermark)

窗口

天的窗口和10 分鍾的 trigger


.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
.trigger(ContinuousEventTimeTrigger.of(Time.seconds(10 * 60)))

process 計算

open: 創建 狀態對象
process: 從當日數據中計算昨日新增用戶進入登錄
clear: 每個窗口結束的時候,清空昨日新增用戶map,將今日新增用戶放入昨日新增用戶map和全部用戶map

open 方法

啟動時調用,創建狀態對象,連接 mysql,加載歷史用戶列表


  /**
   * open: load all user and last day new user
   *
   * @param parameters
   */
  override def open(parameters: Configuration): Unit = {
    LOG.info("RetentionAnalyzeProcessFunction open")
    // create state
    allUserState = getRuntimeContext.getState(new ValueStateDescriptor[util.HashMap[String, Int]]("allUser", classOf[util.HashMap[String, Int]]))
    lastUserState = getRuntimeContext.getState(new ValueStateDescriptor[util.HashMap[String, Int]]("lastUser", classOf[util.HashMap[String, Int]]))
    currentUserState = getRuntimeContext.getState(new ValueStateDescriptor[util.HashMap[String, Int]]("currentUser", classOf[util.HashMap[String, Int]]))

    // connect mysql
    reconnect()
    // load history user
    loadUser()
  }

clear 方法

每個窗口結束的時候調用,用於處理窗口結束事件,在這里處理了一下 當天、昨日、全量用戶map, side 當日用戶


  /**
   * output current day new user
   *
   * @param context
   */
  override def clear(context: Context): Unit = {
    val window = context.window
    LOG.info(String.format("window start : %s, end: %s, clear", DateTimeUtil.formatMillis(window.getStart, DateTimeUtil.YYYY_MM_DD_HH_MM_SS), DateTimeUtil.formatMillis(window.getEnd - 1, DateTimeUtil.YYYY_MM_DD_HH_MM_SS)))
    // clear last user, add current user as last/all user map
    lastUserMap.clear()
    lastUserMap.putAll(currentUser)
    allUserMap.putAll(currentUser)
    lastUserState.update(lastUserMap)
    allUserState.update(allUserMap)
    // export current user to mysql userInfo
    //    exportCurrentUser(window)
    val day = DateTimeUtil.formatMillis(window.getStart, DateTimeUtil.YYYY_MM_DD)
    context.output(sideTag, (day, currentUser))
    //    currentUser.keySet().forEach(item => {
    //      context.output(sideTag, (day, item))
    //    })
    // clear current user
    currentUser.clear()
  }


process 方法

創建一個 HashMap 用了存放昨日新增用戶進入的登錄情況,一個用戶可以多次登錄,所以用 map 重復的直接去掉

把不在 全量用戶map 里的 user_id 放到 當天用戶 map


// loop window element, find last user and current user
val it = elements.iterator
val lastUserLog = new util.HashMap[String, Int]
while (it.hasNext) {
  val userLog = it.next()
  if (lastUserMap.containsKey(userLog.userId)) {
    lastUserLog.put(userLog.userId, 1)
  }
  if (!allUserMap.containsKey(userLog.userId)) {
    currentUser.put(userLog.userId, 1)
  }
}

如果昨日用戶map 為 null,留存率為 0,反之,用 “昨日新增用戶今日登錄數 / 昨日新增用戶數”(取 double)


var str: String = null
var retention: Double = 0
if (!lastUserMap.isEmpty) {
  retention = lastUserLog.size().toDouble / lastUserMap.size()
}
str = day + "," + time + "," + allUserMap.size() + "," + lastUserMap.size() + "," + currentUser.size() + "," + retention
out.collect(str)

side 當日新增用戶

寫 mysql 有點慢,要比較長時間,就改用 side output 輸出,還是慢(是 mysql 的原因,可以先寫到 kafka,再轉到 mysql),就這樣了

// sink current day user to mysql, cost a lot time
val sideTag = new OutputTag[(String, util.HashMap[String, Int])]("side")
val jdbcSink = JdbcSink
  .sink("insert into user_info(user_id, login_day) values(?, ?)", new JdbcStatementBuilder[(String, String)] {
    override def accept(ps: PreparedStatement, element: (String, String)): Unit = {
      ps.setString(1, element._2)
      ps.setString(2, element._1)
    }
  }, JdbcExecutionOptions.builder()
    .withBatchSize(100)
    .withBatchIntervalMs(200)
    .withMaxRetries(5)
    .build(),
    new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
      .withUrl("jdbc:mysql://localhost:3306/venn?useUnicode=true&characterEncoding=utf8&useSSL=false&allowPublicKeyRetrieval=true")
      .withDriverName("com.mysql.cj.jdbc.Driver")
      .withUsername("root")
      .withPassword("123456")
      .build())

stream.getSideOutput(sideTag)
  .flatMap(new RichFlatMapFunction[(String, util.HashMap[String, Int]), (String, String)]() {
    override def flatMap(element: (String, util.HashMap[String, Int]), out: Collector[(String, String)]): Unit = {
      val day = element._1
      val keySet = element._2.keySet()
      keySet.forEach(item => {
        out.collect((day, item))
      })
    }
  })
  .addSink(jdbcSink)
  .name("jdbcSink")
  .uid("jdbcSink")

測試數據

{"category_id":22,"user_id":"257054","item_id":"22732","behavior":"pv","ts":"2022-04-12 22:56:51.000"}
{"category_id":83,"user_id":"506782","item_id":"83818","behavior":"pv","ts":"2022-04-12 22:57:01.000"}
{"category_id":15,"user_id":"196658","item_id":"15335","behavior":"pv","ts":"2022-04-12 22:57:11.000"}
{"category_id":78,"user_id":"715098","item_id":"78131","behavior":"pv","ts":"2022-04-12 22:57:21.000"}
{"category_id":0,"user_id":"374494","item_id":"0714","behavior":"pv","ts":"2022-04-12 22:57:31.000"}
{"category_id":41,"user_id":"651691","item_id":"41995","behavior":"fav","ts":"2022-04-12 22:57:41.000"}
{"category_id":55,"user_id":"725849","item_id":"55589","behavior":"pv","ts":"2022-04-12 22:57:51.000"}

注: 為了方便測試,在模擬數據中給每條數據都遞增 10 s,並且視所有數據都是用戶登錄

輸出結果

逗號分割,依次為: 日期,時間,全量用戶數,昨日新增用戶數,今日新增用戶數,留存率
第一天執行時無昨日數據,流程率為 0
后續執行就有留存率


2022-04-25,12:00:00,0,0,873,0.0
2022-04-25,12:10:00,0,0,933,0.0
2022-04-25,12:20:00,0,0,993,0.0
.....
2022-04-26,00:40:00,5170,5170,240,1.9342359767891682E-4
2022-04-26,00:50:00,5170,5170,300,1.9342359767891682E-4
2022-04-26,01:00:00,5170,5170,359,3.8684719535783365E-4
2022-04-26,01:10:00,5170,5170,419,3.8684719535783365E-4

每天的數據,隨着登錄的用戶越來越多,留存率會越來越大

  • 注: 每個窗口結束時會發現任務有積壓,是寫 mysql 比較慢,導致積壓的,可以先寫 kafka 再轉 mysql

全部代碼參考 Github flink-rookie

歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM