OpenFunction 應用系列之一: 以 Serverless 的方式實現 Kubernetes 日志告警


概述

當我們將容器的日志收集到消息服務器之后,我們該如何處理這些日志?部署一個專用的日志處理工作負載可能會耗費多余的成本,而當日志體量驟增、驟降時亦難以評估日志處理工作負載的待機數量。本文提供了一種基於 Serverless 的日志處理思路,可以在降低該任務鏈路成本的同時提高其靈活性。

我們的大體設計是使用 Kafka 服務器作為日志的接收器,之后以輸入 Kafka 服務器的日志作為事件,驅動 Serverless 工作負載對日志進行處理。據此的大致步驟為:

  1. 搭建 Kafka 服務器作為 Kubernetes 集群的日志接收器
  2. 部署 OpenFunction 為日志處理工作負載提供 Serverless 能力
  3. 編寫日志處理函數,抓取特定的日志生成告警消息
  4. 配置 Notification Manager 將告警發送至 Slack

在這個場景中,我們會利用到 OpenFunction 帶來的 Serverless 能力。

OpenFunction 是 KubeSphere 社區開源的一個 FaaS(Serverless)項目,旨在讓用戶專注於他們的業務邏輯,而不必關心底層運行環境和基礎設施。該項目當前具備以下關鍵能力:

  • 支持通過 dockerfile 或 buildpacks 方式構建 OCI 鏡像
  • 支持使用 Knative Serving 或 OpenFunctionAsync ( KEDA + Dapr ) 作為 runtime 運行 Serverless 工作負載
  • 自帶事件驅動框架

使用 Kafka 作為日志接收器

首先,我們為 KubeSphere 平台開啟 logging 組件(可以參考 啟用可插拔組件 獲取更多信息)。然后我們使用 strimzi-kafka-operator 搭建一個最小化的 Kafka 服務器。

  1. 在 default 命名空間中安裝 strimzi-kafka-operator

    helm repo add strimzi https://strimzi.io/charts/
    helm install kafka-operator -n default strimzi/strimzi-kafka-operator
    
  2. 運行以下命令在 default 命名空間中創建 Kafka 集群和 Kafka Topic,該命令所創建的 Kafka 和 Zookeeper 集群的存儲類型為 ephemeral,使用 emptyDir 進行演示。

    注意,我們此時創建了一個名為 “logs” 的 topic,后續會用到它

    cat <<EOF | kubectl apply -f -
    apiVersion: kafka.strimzi.io/v1beta2
    kind: Kafka
    metadata:
      name: kafka-logs-receiver
      namespace: default
    spec:
      kafka:
        version: 2.8.0
        replicas: 1
        listeners:
          - name: plain
            port: 9092
            type: internal
            tls: false
          - name: tls
            port: 9093
            type: internal
            tls: true
        config:
          offsets.topic.replication.factor: 1
          transaction.state.log.replication.factor: 1
          transaction.state.log.min.isr: 1
          log.message.format.version: '2.8'
          inter.broker.protocol.version: "2.8"
        storage:
          type: ephemeral
      zookeeper:
        replicas: 1
        storage:
          type: ephemeral
      entityOperator:
        topicOperator: {}
        userOperator: {}
    ---
    apiVersion: kafka.strimzi.io/v1beta1
    kind: KafkaTopic
    metadata:
      name: logs
      namespace: default
      labels:
        strimzi.io/cluster: kafka-logs-receiver
    spec:
      partitions: 10
      replicas: 3
      config:
        retention.ms: 7200000
        segment.bytes: 1073741824
    EOF
    
  3. 運行以下命令查看 Pod 狀態,並等待 Kafka 和 Zookeeper 運行並啟動。

    $ kubectl get po
    NAME                                                   READY   STATUS        RESTARTS   AGE
    kafka-logs-receiver-entity-operator-568957ff84-nmtlw   3/3     Running       0          8m42s
    kafka-logs-receiver-kafka-0                            1/1     Running       0          9m13s
    kafka-logs-receiver-zookeeper-0                        1/1     Running       0          9m46s
    strimzi-cluster-operator-687fdd6f77-cwmgm              1/1     Running       0          11m
    

    運行以下命令查看 Kafka 集群的元數據:

    # 啟動一個工具 pod
    $ kubectl run utils --image=arunvelsriram/utils -i --tty --rm
    # 查看 Kafka 集群的元數據
    $ kafkacat -L -b kafka-logs-receiver-kafka-brokers:9092
    

我們將這個 Kafka 服務器添加為日志接收器。

  1. admin 身份登錄 KubeSphere 的 Web 控制台。點擊左上角的平台管理,然后選擇集群管理

    如果您啟用了多集群功能,您可以選擇一個集群。

  2. 集群管理頁面,選擇集群設置下的日志收集

  3. 點擊添加日志接收器並選擇 Kafka。輸入 Kafka 代理地址和端口信息,然后點擊確定繼續。

  1. 運行以下命令驗證 Kafka 集群是否能從 Fluent Bit 接收日志:

    # 啟動一個工具 pod
    $ kubectl run utils --image=arunvelsriram/utils -i --tty --rm 
    # 檢查 logs topic 中的日志情況
    $ kafkacat -C -b kafka-logs-receiver-kafka-0.kafka-logs-receiver-kafka-brokers.default.svc:9092 -t logs
    

部署 OpenFunction

按照概述中的設計,我們需要先部署 OpenFunction。OpenFunction 項目引用了很多第三方的項目,如 Knative、Tekton、ShipWright、Dapr、KEDA 等,手動安裝較為繁瑣,推薦使用 Prerequisites 文檔 中的方法,一鍵部署 OpenFunction 的依賴組件。

其中 --with-shipwright 表示部署 shipwright 作為函數的構建驅動
--with-openFuncAsync 表示部署 OpenFuncAsync Runtime 作為函數的負載驅動
而當你的網絡在訪問 Github 及 Google 受限時,可以加上 --poor-network 參數用於下載相關的組件

sh hack/deploy.sh --with-shipwright --with-openFuncAsync --poor-network

部署 OpenFunction:

此處選擇安裝最新的穩定版本,你也可以使用開發版本,參考 Install 文檔

為了可以正常使用 ShipWright ,我們提供了默認的構建策略,可以使用以下命令設置該策略:

kubectl apply -f https://raw.githubusercontent.com/OpenFunction/OpenFunction/main/config/strategy/openfunction.yaml
kubectl apply -f https://github.com/OpenFunction/OpenFunction/releases/download/v0.3.0/bundle.yaml

編寫日志處理函數

我們以 創建並部署 WordPress 為例,搭建一個 WordPress 應用作為日志的生產者。該應用的工作負載所在的命名空間為 “demo-project”,Pod 名稱為 “wordpress-v1-f54f697c5-hdn2z”。

當請求結果為 404 時,我們收到的日志內容如下:

{"@timestamp":1629856477.226758,"log":"*.*.*.* - - [25/Aug/2021:01:54:36 +0000] \"GET /notfound HTTP/1.1\" 404 49923 \"-\" \"curl/7.58.0\"\n","time":"2021-08-25T01:54:37.226757612Z","kubernetes":{"pod_name":"wordpress-v1-f54f697c5-hdn2z","namespace_name":"demo-project","container_name":"container-nrdsp1","docker_id":"bb7b48e2883be0c05b22c04b1d1573729dd06223ae0b1676e33a4fac655958a5","container_image":"wordpress:4.8-apache"}}

我們的需求是:當一個請求結果為 404 時,發送一個告警通知給接收器(可以根據 配置 Slack 通知 配置一個 Slack 告警接收器),並記錄命名空間、Pod 名稱、請求路徑、請求方法等信息。按照這個需求,我們編寫一個簡單的處理函數:

你可以從 OpenFunction Context Spec 處了解 openfunction-context 的使用方法,這是 OpenFunction 提供給用戶編寫函數的工具庫
你可以通過 OpenFunction Samples 了解更多的 OpenFunction 函數案例

package logshandler

import (
	"encoding/json"
	"fmt"
	"log"
	"regexp"
	"time"

	ofctx "github.com/OpenFunction/functions-framework-go/openfunction-context"
	alert "github.com/prometheus/alertmanager/template"
)

const (
	HTTPCodeNotFound = "404"
	Namespace        = "demo-project"
	PodName          = "wordpress-v1-[A-Za-z0-9]{9}-[A-Za-z0-9]{5}"
	AlertName        = "404 Request"
	Severity         = "warning"
)

// LogsHandler ctx 參數提供了用戶函數在集群語境中的上下文句柄,如 ctx.SendTo 用於將數據發送至指定的目的地
// LogsHandler in 參數用於將輸入源中的數據(如有)以 bytes 的方式傳遞給函數
func LogsHandler(ctx *ofctx.OpenFunctionContext, in []byte) int {
	content := string(in)
	// 這里我們設置了三個正則表達式,分別用於匹配 HTTP 返回碼、資源命名空間、資源 Pod 名稱
	matchHTTPCode, _ := regexp.MatchString(fmt.Sprintf(" %s ", HTTPCodeNotFound), content)
	matchNamespace, _ := regexp.MatchString(fmt.Sprintf("namespace_name\":\"%s", Namespace), content)
	matchPodName := regexp.MustCompile(fmt.Sprintf(`(%s)`, PodName)).FindStringSubmatch(content)

	if matchHTTPCode && matchNamespace && matchPodName != nil {
		log.Printf("Match log - Content: %s", content)

		// 如果上述三個正則表達式同時命中,那么我們需要提取日志內容中的一些信息,用於填充至告警信息中
		// 這些信息為:404 請求的請求方式(HTTP Method)、請求路徑(HTTP Path)以及 Pod 名稱
		match := regexp.MustCompile(`([A-Z]+) (/\S*) HTTP`).FindStringSubmatch(content)
		if match == nil {
			return 500
		}
		path := match[len(match)-1]
		method := match[len(match)-2]
		podName := matchPodName[len(matchPodName)-1]

		// 收集到關鍵信息后,我們使用 altermanager 的 Data 結構體組裝告警信息
		notify := &alert.Data{
			Receiver:          "notification_manager",
			Status:            "firing",
			Alerts:            alert.Alerts{},
			GroupLabels:       alert.KV{"alertname": AlertName, "namespace": Namespace},
			CommonLabels:      alert.KV{"alertname": AlertName, "namespace": Namespace, "severity": Severity},
			CommonAnnotations: alert.KV{},
			ExternalURL:       "",
		}
		alt := alert.Alert{
			Status: "firing",
			Labels: alert.KV{
				"alertname": AlertName,
				"namespace": Namespace,
				"severity":  Severity,
				"pod":       podName,
				"path":      path,
				"method":    method,
			},
			Annotations:  alert.KV{},
			StartsAt:     time.Now(),
			EndsAt:       time.Time{},
			GeneratorURL: "",
			Fingerprint:  "",
		}
		notify.Alerts = append(notify.Alerts, alt)
		notifyBytes, _ := json.Marshal(notify)

		// 使用 ctx.SendTo 將內容發送給名為 "notification-manager" 的輸出端(你可以在之后的函數配置 logs-handler-function.yaml 中找到它的定義)
		if err := ctx.SendTo(notifyBytes, "notification-manager"); err != nil {
			panic(err)
		}
		log.Printf("Send log to notification manager.")
	}
	return 200
}

我們將這個函數上傳到代碼倉庫中,記錄代碼倉庫的地址以及代碼在倉庫中的目錄路徑,在下面的創建函數步驟中我們將使用到這兩個值。

你可以在 OpenFunction Samples 中找到這個案例。

創建函數

接下來我們將使用 OpenFunction 構建上述的函數。首先設置一個用於訪問鏡像倉庫的秘鑰文件 push-secret(在使用代碼構建出 OCI 鏡像后,OpenFunction 會將該鏡像上傳到用戶的鏡像倉庫中,用於后續的負載啟動):

REGISTRY_SERVER=https://index.docker.io/v1/ REGISTRY_USER=<your username> REGISTRY_PASSWORD=<your password>
kubectl create secret docker-registry push-secret \
    --docker-server=$REGISTRY_SERVER \
    --docker-username=$REGISTRY_USER \
    --docker-password=$REGISTRY_PASSWORD

應用函數 logs-handler-function.yaml

函數定義中包含了對兩個關鍵組件的使用:

Dapr 對應用程序屏蔽了復雜的中間件,使得 logs-handler 可以非常容易地處理 Kafka 中的事件

KEDA 通過監控消息服務器中的事件流量來驅動 logs-handler 函數的啟動,並且根據 Kafka 中消息的消費延時動態擴展 logs-handler 實例

apiVersion: core.openfunction.io/v1alpha1
kind: Function
metadata:
  name: logs-handler
spec:
  version: "v1.0.0"
  # 這里定義了構建后的鏡像的上傳路徑
  image: openfunctiondev/logs-async-handler:v1
  imageCredentials:
    name: push-secret
  build:
    builder: openfunctiondev/go115-builder:v0.2.0
    env:
      FUNC_NAME: "LogsHandler"
    # 這里定義了源代碼的路徑
    # url 為上面提到的代碼倉庫地址
    # sourceSubPath 為代碼在倉庫中的目錄路徑
    srcRepo:
      url: "https://github.com/OpenFunction/samples.git"
      sourceSubPath: "functions/OpenFuncAsync/logs-handler-function/"
  serving:
    # OpenFuncAsync 是 OpenFunction 通過 KEDA+Dapr 實現的一種由事件驅動的異步函數運行時
    runtime: "OpenFuncAsync"
    openFuncAsync:
      # 此處定義了函數的輸入(kafka-receiver)和輸出(notification-manager),與下面 components 中的定義對應關聯
      dapr:
        inputs:
          - name: kafka-receiver
            type: bindings
        outputs:
          - name: notification-manager
            type: bindings
            params:
              operation: "post"
              type: "bindings"
        annotations:
          dapr.io/log-level: "debug"
        # 這里完成了上述輸入端和輸出端的具體定義(即 Dapr Components)
        components:
          - name: kafka-receiver
            type: bindings.kafka
            version: v1
            metadata:
              - name: brokers
                value: "kafka-logs-receiver-kafka-brokers:9092"
              - name: authRequired
                value: "false"
              - name: publishTopic
                value: "logs"
              - name: topics
                value: "logs"
              - name: consumerGroup
                value: "logs-handler"
          # 此處為 KubeSphere 的 notification-manager 地址
          - name: notification-manager
            type: bindings.http
            version: v1
            metadata:
              - name: url
                value: http://notification-manager-svc.kubesphere-monitoring-system.svc.cluster.local:19093/api/v2/alerts
      keda:
        scaledObject:
          pollingInterval: 15
          minReplicaCount: 0
          maxReplicaCount: 10
          cooldownPeriod: 30
          # 這里定義了函數的觸發器,即 Kafka 服務器的 “logs” topic
          # 同時定義了消息堆積閾值(此處為 10),即當消息堆積量超過 10,logs-handler 實例個數就會自動擴展
          triggers:
            - type: kafka
              metadata:
                topic: logs
                bootstrapServers: kafka-logs-receiver-kafka-brokers.default.svc.cluster.local:9092
                consumerGroup: logs-handler
                lagThreshold: "10"

結果演示

我們先關閉 Kafka 日志接收器:在日志收集頁面,點擊進入 Kafka 日志接收器詳情頁面,然后點擊更多操作並選擇更改狀態,將其設置為關閉

停用后一段時間,我們可以觀察到 logs-handler 函數實例已經收縮到 0 了。

再將 Kafka 日志接收器激活,logs-handler 隨之啟動。

~# kubectl get po --watch
NAME                                                     READY   STATUS        RESTARTS   AGE
kafka-logs-receiver-entity-operator-568957ff84-tdrrx     3/3     Running       0          7m27s
kafka-logs-receiver-kafka-0                              1/1     Running       0          7m48s
kafka-logs-receiver-zookeeper-0                          1/1     Running       0          8m12s
logs-handler-serving-kpngc-v100-zcj4q-5f46996f8c-b9d6f   2/2     Terminating   0          34s
strimzi-cluster-operator-687fdd6f77-kc8cv                1/1     Running       0          10m
logs-handler-serving-kpngc-v100-zcj4q-5f46996f8c-b9d6f   2/2     Terminating   0          36s
logs-handler-serving-kpngc-v100-zcj4q-5f46996f8c-b9d6f   0/2     Terminating   0          37s
logs-handler-serving-kpngc-v100-zcj4q-5f46996f8c-b9d6f   0/2     Terminating   0          38s
logs-handler-serving-kpngc-v100-zcj4q-5f46996f8c-b9d6f   0/2     Terminating   0          38s
logs-handler-serving-kpngc-v100-zcj4q-5f46996f8c-9kj2c   0/2     Pending       0          0s
logs-handler-serving-kpngc-v100-zcj4q-5f46996f8c-9kj2c   0/2     Pending       0          0s
logs-handler-serving-kpngc-v100-zcj4q-5f46996f8c-9kj2c   0/2     ContainerCreating   0          0s
logs-handler-serving-kpngc-v100-zcj4q-5f46996f8c-9kj2c   0/2     ContainerCreating   0          2s
logs-handler-serving-kpngc-v100-zcj4q-5f46996f8c-9kj2c   1/2     Running             0          4s
logs-handler-serving-kpngc-v100-zcj4q-5f46996f8c-9kj2c   2/2     Running             0          11s

接着我們向 WordPress 應用一個不存在的路徑發起請求:

curl http://<wp-svc-address>/notfound

可以看到 Slack 中已經收到了這條消息(與之對比的是,當我們正常訪問該 WordPress 站點時, Slack 中並不會收到告警消息):

進一步探索

  • 同步函數的解決方案

    為了可以正常使用 Knative Serving ,我們需要設置其網關的負載均衡器地址。(你可以使用本機地址作為 workaround)

    將下面的 "1.2.3.4" 替換為實際場景中的地址。

    kubectl patch svc -n kourier-system kourier \
    -p '{"spec": {"type": "LoadBalancer", "externalIPs": ["1.2.3.4"]}}'
    
    kubectl patch configmap/config-domain -n knative-serving \
    --type merge --patch '{"data":{"1.2.3.4.sslip.io":""}}'
    

    除了直接由 Kafka 服務器驅動函數運作(異步方式),OpenFunction 還支持使用自帶的事件框架對接 Kafka 服務器,之后以 Sink 的方式驅動 Knative 函數運作。可以參考 OpenFunction Samples 中的案例。

    在該方案中,同步函數的處理速度較之異步函數有所降低,當然我們同樣可以借助 KEDA 來觸發 Knative Serving 的 concurrency 機制,但總體而言缺乏異步函數的便捷性。(后續的階段中我們會優化 OpenFunction 的事件框架來解決同步函數這方面的缺陷)

    由此可見,不同類型的 Serverless 函數有其擅長的任務場景,如一個有序的控制流函數就需要由同步函數而非異步函數來處理。

綜述

Serverless 帶來了我們所期望的對業務場景快速拆解重構的能力。

如本案例所示,OpenFunction 不但以 Serverless 的方式提升了日志處理、告警通知鏈路的靈活度,還通過函數框架將通常對接 Kafka 時復雜的配置步驟簡化為語義明確的代碼邏輯。同時,我們也在不斷演進 OpenFunction,將在之后版本中實現由自身的 Serverless 能力驅動自身的組件運作。

本文由博客一文多發平台 OpenWrite 發布!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM