Beats:如何創建一個定制的Elastic Beat


Beats作為Elastic Stack家族中重要的部分。它可以和方便地讓我們把我們的數據發送到Elasticsearch或Logstash之中。如果我們想要生成自己的Beat,請使用GitHub的beats倉庫中提供的Beat生成器。在今天的文章中,我們將詳細介紹如何一步一步地來創建一個我們自己想要的beat。

設置自己的開發環境

安裝go環境

Beats實際上是go程序。我們可以參照鏈接“Go get started”(https://golang.org/doc/install)來安裝自己的golang語言開發環境。等我們安裝好我們的go后,我們可以在terminal中打入如下的命令:

    $ which go
    /usr/local/go/bin/go

那么我們需要在我們的環境中設置如下的變量:

    export GOROOT=/usr/local/go
    export PATH=$GOPATH/bin:$GOROOT/bin:$PATH
    export GOPATH=$HOME/go/beats

在這里,我也設置了以GOPATH。你可以設置自己的路徑。針對我的情況,我在我的home目錄下創建了一個go目錄,並在go目錄下生產一個叫做beats的目錄。在一下,我們會在這個目錄里生成我們的定制的beat。

下載Elastic beats源碼

在這一步我們下載Elastic beats的源碼。在termnial中打入如下的命令:

    mkdir -p ${GOPATH}/src/github.com/elastic
    git clone https://github.com/elastic/beats ${GOPATH}/src/github.com/elastic/beats

安裝Python

目前generator只對Python2適用,所以,我們需要安裝Python2。我們可以參照頁面https://www.python.org/downloads/進行安裝我們的python2。
安裝virtualenv

我們必須安裝virtualenv才能使得generator正常工作。可以參照鏈接https://virtualenv.pypa.io/en/latest/installation/來進行安裝。如果自己的電腦上同時已經安裝了python3,那么我們需要同時設置如寫變量:

    export PYTHON_EXE='python2.7'
    export VIRTUALENV_PARAMS='-p python2.7'
    export VIRTUALENV_PYTHON='/usr/bin/python2.7'
     
    export VIRTUALENV_PYTHON='/usr/local/bin/python' (for Mac)

請注意:這里的python是2.x版本的python,而不是python3。我們需要保證VIRTUALENV_PYTHON指向我們的Python2的執行文件。

安裝mage

我們需要在地址https://github.com/magefile/mage下載這個源碼,並編譯:

    go get -u -d github.com/magefile/mage
    cd $GOPATH/src/github.com/magefile/mage
    go run bootstrap.go

等上面的命令執行完后,我們可以在如下的目錄中找到編譯好的執行文件mage:

    liuxg-2:bin liuxg$ ls $GOPATH/bin
    mage

創建定制beat

首先創建一個目錄在$GOPATH下,並進入該目錄。

    mkdir ${GOPATH}/src/github.com/{user}
    cd ${GOPATH}/src/github.com/{user}

注意這里的user指的是自己在github上的用戶名。比如針對我的情況是liu-xiao-guo。我打入如下寫的命令:

    mkdir ${GOPATH}/src/github.com/liu-xiao-guo
    cd  $GOPATH/src/github.com/elastic/beats/

接下來,我們運行如下的命令:

mage GenerateCustomBeat

執行結果:

    $ mage GenerateCustomBeat
    2019/11/13 15:24:01 Found Elastic Beats dir at /Users/liuxg/go/beats/src/github.com/elastic/beats
    Enter the beat name [examplebeat]: Countbeat
    Enter your github name [your-github-name]: liu-xiao-guo
    Enter the beat path [github.com/liu-xiao-guo/countbeat]: 
    Enter your full name [Firstname Lastname]: Xiaoguo Liu
    Enter the beat type [beat]: 
    DEPRECATION: Python 2.7 will reach the end of its life on January 1st, 2020. Please upgrade your Python as Python 2.7 won't be maintained after that date. A future version of pip will drop support for Python 2.7. More details about Python 2 support in pip, can be found at https://pip.pypa.io/en/latest/development/release-process/#python-2-support
    DEPRECATION: Python 2.7 will reach the end of its life on January 1st, 2020. Please upgrade your Python as Python 2.7 won't be maintained after that date. A future version of pip will drop support for Python 2.7. More details about Python 2 support in pip, can be found at https://pip.pypa.io/en/latest/development/release-process/#python-2-support
     
    WARNING: Retrying (Retry(total=4, connect=None, read=None, redirect=None, status=None)) after connection broken by 'ReadTimeoutError("HTTPSConnectionPool(host='pypi.tuna.tsinghua.edu.cn', port=443): Read timed out. (read timeout=15)",)': /simple/semver/
    2019/11/13 15:25:50 Found Elastic Beats dir at /Users/liuxg/go/beats/src/github.com/liu-xiao-guo/countbeat/vendor/github.com/elastic/beats
    Generated fields.yml for countbeat to /Users/liuxg/go/beats/src/github.com/liu-xiao-guo/countbeat/fields.yml
    2019/11/13 15:25:52 Found Elastic Beats dir at /Users/liuxg/go/beats/src/github.com/liu-xiao-guo/countbeat/vendor/github.com/elastic/beats
    Auto packing the repository in background for optimum performance.
    See "git help gc" for manual housekeeping.
    =======================
    Your custom beat is now available as /Users/liuxg/go/beats/src/github.com/liu-xiao-guo/countbeat
    =======================

這樣,我們基本上就生產了一個最基本的beat的框架。

接下來,我們進入到我們的beat目錄里,並進行編譯:

cd ${GOPATH}/src/github.com/{user}/countbeat

針對我的情況:

cd ${GOPATH}/src/github.com/liu-xiao-guo/countbeat

我們可以看一下里面最基本的文件:

    $ pwd
    /Users/liuxg/go/beats/src/github.com/liu-xiao-guo/countbeat
    liuxg-2:countbeat liuxg$ ls
    CONTRIBUTING.md		cmd			magefile.go
    LICENSE.txt		config			main.go
    Makefile		countbeat.docker.yml	main_test.go
    NOTICE.txt		countbeat.reference.yml	make.bat
    README.md		countbeat.yml		tests
    _meta			docs			vendor
    beater			fields.yml
    build			include

這里有最基本的框架文件。里面含有一個叫做countbeat.yml的配置文件及一些標准的模板文件。我們在命令行中直接打入如下的指令:

make

    $ make
    go build -i -ldflags "-X github.com/liu-xiao-guo/countbeat/vendor/github.com/elastic/beats/libbeat/version.buildTime=2019-11-13T07:33:25Z -X github.com/liu-xiao-guo/countbeat/vendor/github.com/elastic/beats/libbeat/version.commit=501bd87da668346f78398676c78b4a39394a3640"

經過上面的編譯,我們可以發現在當前的目錄下,有一個已經編譯好的countbeat可執行文件:

我們在當前的目錄下直接運行這個可執行的文件:

./countbeat -e -d "*"

我們可以在terminal中看到:

那么在我們的Kibana中也可以看到如下信息:

顯然數據已經被成功上傳到Elasticsearch中了。

每一個文檔的內容如下:

    {
      "@timestamp": "2019-11-13T07:38:57.095Z",
      "agent": {
        "version": "8.0.0",
        "type": "countbeat",
        "ephemeral_id": "d3f0638e-ee58-45ff-92cc-74f188fd66a4",
        "hostname": "liuxg-2.local",
        "id": "1d35220e-7f75-442a-88eb-43ec1e97f0d0"
      },
      "counter": 5,
      "ecs": {
        "version": "1.2.0"
      },
      "host": {
        "hostname": "liuxg-2.local",
        "architecture": "x86_64",
        "os": {
          "build": "19B88",
          "platform": "darwin",
          "version": "10.15.1",
          "family": "darwin",
          "name": "Mac OS X",
          "kernel": "19.0.0"
        },
        "id": "E51545F1-4BDC-5890-B194-83D23620325A",
        "name": "liuxg-2.local"
      },
      "type": "liuxg-2.local"
    }

它里面含有一個counter的整數值。

所有關於beat的設計上的代碼可以在目錄${GOPATH}/src/github.com/liu-xiao-guo/countbeat下的/beater/CountBeat.go文件里實現的。設計比較直接。大家可以看一下代碼應該可以明白。

讀取JSON文件beat

在上面我們已經熟悉了如何去創建一個template的beat。它是一個最基本的beat,並沒有什么特別的功能。在這節里,我們接着如法炮制來創建一個稍微有一點用途的beat。我們的這個beat叫做readjson beat。它的源碼可以按照如下的方法得到:

git clone https://github.com/liu-xiao-guo/beats-readjson

首先,我們可以准備一個我們想要的json文件,比如:

users.json

    {
      "users": [
        {
          "name": "Elliot",
          "type": "Reader",
          "age": 23,
          "social": {
            "facebook": "https://facebook.com",
            "twitter": "https://twitter.com"
          }
        },
        {
          "name": "Fraser",
          "type": "Author",
          "age": 17,
          "social": {
            "facebook": "https://facebook.com",
            "twitter": "https://twitter.com"
          }
        }
      ]
    }

我們可以把這個文件放入到我們如何喜歡的位置。針對我的情況,我把它置於我的電腦的如下位置:

/Users/liuxg/data/beats/users.json

我們可以在readjson.yml文件中進行配置:

readjson.yml

我們的readjson.go設計也相當簡單:

readjson.go

    package beater
     
    import (
    	"fmt"
    	"os"
    	"io/ioutil"
    	"encoding/json"
    	"strconv"
    	"time"
    	"os/signal"
        "syscall"
     
    	"github.com/elastic/beats/libbeat/beat"
    	"github.com/elastic/beats/libbeat/common"
    	"github.com/elastic/beats/libbeat/logp"
     
    	"github.com/liu-xiao-guo/readjson/config"
    )
     
    type Users struct {
        Users []User `json:"users"`
    }
     
    // User struct which contains a name
    // a type and a list of social links
    type User struct {
        Name   string `json:"name"`
        Type   string `json:"type"`
        Age    int    `json:"Age"`
        Social Social `json:"social"`
    }
     
    // Social struct which contains a
    // list of links
    type Social struct {
        Facebook string `json:"facebook"`
        Twitter  string `json:"twitter"`
    }
     
    // readjson configuration.
    type readjson struct {
    	done   chan struct{}
    	config config.Config
    	client beat.Client
    }
     
    // New creates an instance of readjson.
    func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) {
    	c := config.DefaultConfig
    	if err := cfg.Unpack(&c); err != nil {
    		return nil, fmt.Errorf("Error reading config file: %v", err)
    	}
     
    	bt := &readjson{
    		done:   make(chan struct{}),
    		config: c,
    	}
    	return bt, nil
    }
     
    // Run starts readjson.
    func (bt *readjson) Run(b *beat.Beat) error {
    	logp.Info("readjson is running! Hit CTRL-C to stop it.")
    	var err error
    	bt.client, err = b.Publisher.Connect()
    	if err != nil {
    		return err
    	}
     
    	fmt.Println("Path: ", bt.config.Path)
    	fmt.Println("Period: ", bt.config.Period)
     
    	
    	// Open our jsonFile
    	jsonFile, err := os.Open(bt.config.Path)
    	// if we os.Open returns an error then handle it
    	if err != nil {
        	fmt.Println(err)
    	}
     
    	fmt.Println("Successfully Opened users.json")
    	// defer the closing of our jsonFile so that we can parse it later on
    	defer jsonFile.Close()
    	
     
    	byteValue, _ := ioutil.ReadAll(jsonFile)
     
    	// we initialize our Users array
        var users Users
     
        json.Unmarshal(byteValue, &users)
     
        // we iterate through every user within our users array and
        // print out the user Type, their name, and their facebook url
        // as just an example
        for i := 0; i < len(users.Users); i++ {
            fmt.Println("User Type: " + users.Users[i].Type)
            fmt.Println("User Age: " + strconv.Itoa(users.Users[i].Age))
            fmt.Println("User Name: " + users.Users[i].Name)
            fmt.Println("Facebook Url: " + users.Users[i].Social.Facebook)
     
            event := beat.Event{
    			Timestamp: time.Now(),
    			Fields: common.MapStr {
    				"ostype":    	b.Info.Name,
    				"name":		users.Users[i].Name,
    				"type":		users.Users[i].Type,
    				"age":		users.Users[i].Age,
    				"social":	users.Users[i].Social,
    			},
    		}
     
    		bt.client.Publish(event)
        }
     
        c := make(chan os.Signal)
        signal.Notify(c, os.Interrupt, syscall.SIGTERM)
        go func() {
            <-c
            os.Exit(1)
        }()
     
        for {
            fmt.Println("sleeping...")
            time.Sleep(10 * time.Second)
        }	
    }
     
    // Stop stops readjson.
    func (bt *readjson) Stop() {
    	bt.client.Close()
    	close(bt.done)
    }

它在run method里把json文件讀入,並把它們分別發送出去到我們的Elasticsearch中。

我們按照上面的步驟進行編譯,並最終運行我們的readjson beat。

./readjson -e

我們可以在Kibana中看到我們已經發送上來的beat信息:

參考:
【1】https://www.elastic.co/guide/en/beats/devguide/7.5/newbeat-generate.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM