如何用GO實現一個tail -f功能以及相應的思維發散


此文已由作者楊望暑授權網易雲社區發布。

歡迎訪問網易雲社區,了解更多網易技術產品運營經驗。


背景

在服務端查看log會經常使用到tail -f命令實時跟蹤文件變化. 那么問題來了, 如果自己寫一個同樣功能的, 該何處寫起呢? 如果你用過ELK里的beats/filebeat的話, 應該知道filebeat做的事情就是監控日志變化, 並把最新數據,按照自定義配置處理后, 發送給ElasticSearch/kafka/... 對, 本文就是想介紹如何自己實現一個簡易版filebeat, 只要日志內容發生變化(append new line), 能觸發一個消息, 實現對這一行數據的預處理, 打印, 接入kafka等動作, 還有一個功能是, 當這個工具重啟后, 依然能從上次讀取的位置開始讀.


工具

Golang IDEA


大致流程


Alt pic


具體實現

從流程圖中可以看出, 我們需要解決下面幾個問題

  1. 記錄上一次程序關閉前,文件讀取位置,下次程序啟動時候加載這個位置信息.

  2. 文件定位並按行讀取, 並發布讀取的行

  3. 監測文件內容變化,並發出通知


記錄上次讀取位置

這個問題關鍵應該是什么時候記錄上次讀取的offset.

  1. 讀取並發布后記錄 如果發布后,做記錄前,程序掛了,那么重啟程序后,這行數據會重新被讀一次.

  2. 讀取后馬上記錄,記錄成功后,才對外發布. 這樣會產生另一個問題, 發布前程序掛了, 重啟后, 那條未必發送的消息,外部是拿不到了.


如果沒理解錯, elastic的filebeat選的就是第一種,且沒做相應的異常處理, 他是設置一個channel池, 接收並異步寫入位置信息, 如果寫入失敗, 則打印一條error日志就繼續走了

logp.Err("Writing of registry returned error: %v. Continuing...", err)


文件定位並按行讀取, 並發布讀取的行

要讀取一個文件, 首先要有一個reader

func (tail *Tailf) openReader() {
    tail.file, _ = os.Open(tail.FileName)
    tail.reader = bufio.NewReader(tail.file)
}


對於從文件位置(offset)=0處開始讀一行, 這沒什么問題, 直接用下面這個方法就可以了.

func (tail *Tailf) readLine() (string, error) {
    line, err := tail.reader.ReadString('\n')    if err != nil {        return line, err
    }
    line = strings.TrimRight(line, "\n")    return line, err
}


但是, 對於文件內容增加了, 但是還沒到一行,也就是沒出現\n 卻出現了EOF(end of file), 那這個情況下, 我們是要等待的,offset必須保持在這一行的行頭.

func (tail *Tailf) getOffset() (offset int64, err error) {
    offset, err = tail.file.Seek(0, os.SEEK_CUR)
    offset -= int64(tail.reader.Buffered())    return}func (tail *Tailf) beginWatch() {
    tail.openReader()    var offset int64
    for {       //取上一次讀取位置(行頭)
        offset, _ = tail.getOffset()
        line, err := tail.readLine()        if err == nil {
            tail.publishLine(line)
        } else if err == io.EOF {            //讀到了EOF, offset設置回到行頭
            tail.seekTo(Seek{offset: offset, whence: 0})            //block and wait for changes
            tail.waitChangeEvent()
        } else {
            fmt.Println(err)            return
        }
    }
}func (tail *Tailf) seekTo(pos Seek) error {
    tail.file.Seek(pos.offset, pos.whence)    //一旦改變了offset, 這個reader必須reset一下才能生效
    tail.reader.Reset(tail.file)    return nil}// 這里是發布一個消息, 因為是demo,所以只是簡單的往channel里一扔func (tail *Tailf) publishLine(line string) {
    tail.Lines <- line
}


下面說說waitChangeEvent


如何監視文件內容變化,並通知

監測文件內容增加的方式大體有2種

  1. 監測文件最后修改時間以及文件大小的變化,俗稱poll--輪詢

  2. 利用linux的inotify命令實現監測,他會在文件發生狀態改變后觸發事件


這里采用第一種方式, filebeat也用的第一種. 我們自己怎么實現呢?

//currReadPos: 文件末尾的offset,也就是當前文件大小func (w *PollWatcher) ChangeEvent(currReadPos int64) (*ChangeEvent, error) {

    watchingFile, err := os.Stat(w.FileName)    if err != nil {        return nil, err
    }
    changes := NewChangeEvent()    //當前的大小
    w.FileSize = currReadPos    //之前的修改時間
    previousModTime := watchingFile.ModTime()    //輪詢
    go func() {
        previousSize := w.FileSize        for {
            time.Sleep(POLL_DURATION)            //這里省略很多代碼, 假設文件是存在的,且沒被重命名,刪除之類的情況, 文件是像日志文件一樣不斷append的 
            file, _ := os.Stat(w.FileName)        // ... 省略一大段代碼
            if previousSize > 0 && previousSize < w.FileSize {                //文件肥了
                changes.NotifyModified()
                previousSize = w.FileSize                continue
            }

            previousSize = w.FileSize            // 處理 原本沒內容, 但是加入了內容, 所以要用修改時間
            modTime := file.ModTime()            if modTime != previousModTime {
                previousModTime = modTime
                changes.NotifyModified()
            }
        }
    }()    return changes, nil}


這里的changes.NotifyModified方法只是往下面實例里Modified Channel 放入 ce.Modified <- true


type ChangeEvent struct {
    Modified  chan bool
    Truncated chan bool
    Deleted   chan bool}


也正是這個動作, 在主線程中, 就能收到文件被修改的通知, 從而繼續出發readLine動作


// 上面有個beginWatch方法代碼,結合這個代碼來看func (tail *Tailf) waitChangeEvent() error {    // ... 省略初始化動作
    select {    //只測試文件內容增加
    case <-tail.changes.Modified:
        fmt.Println(">> find Modified")        return nil
    // ... 省略其他
    }
}


有了這個一連串的代碼后, 我們就能在main里監視文件變化了


func main() {
    t, _ := tailf.NewTailf("/Users/yws/Desktop/test.log")    for line := range t.Lines {    //這里會block住,有新行到來,就會輸出新行
        fmt.Println(line)
    }
}


擴展點

這個擴展點, 和filebeat一樣.


  1. 在讀取時候, 不一定是按行讀取,可以讀多行,json解析等

  2. 發布時候, 本文例子是直接寫console, 其實可以接kafka, redis, 數據庫等

  3. .... 想不出來了


總結

雖然是一個很簡單的功能, 現代主流服務端編程語言基本都能實現, 但為什么用go來實現呢? 一大堆優點和缺點就不列了..這不是軟文. 談談go初學者的看法


  1. 代碼很簡潔, 雖然不支持很多高級語言特性, 但看起來依然那么爽, 除了那些過渡包裝的struct以及怪異的取名.

  2. 寫並發(goroutine)是那么的簡單,那么的優雅,但也很容易被我這樣的菜鳥濫用, 這語言debug目前有點肉痛

  3. goroutine通信也是那么的簡單, channel設計的很棒, 用着很爽

  4. 不爽的地方, 多返回值的問題, 寫慣了java的xinstance.method(yInstance.method()), 當yInstance.method()是多返回值的時候,必須拆分成2行或更多, 每次編譯器報錯時候就想砸鍵盤.


參考資料


  1. https://github.com/elastic/beats filebeat只是其中一個feature

  2. https://github.com/hpcloud/tail 寫到一半發現原來別人也干過一樣的事了, 代碼基本大同小異, 有興趣的可以看他的代碼, 寫的更完善.



網易雲免費體驗館,0成本體驗20+款雲產品! 

更多網易技術、產品、運營經驗分享請點擊


相關文章:
【推薦】 網易七魚 Android 高性能日志寫入方案
【推薦】 【專家坐堂】四種並發編程模型簡介
【推薦】 消息中間件客戶端消費控制實踐


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM