手把手帶你使用 go-kit(組件擴充,服務發現)


首先,讓我們來回顧一下我們的項目架構

// 項目結構
-| Server
----| server.go
-| EndPoint
----| endpoint.go
-| Transport
----| Transport.go
- main.go

使用外部路由組件擴充服務

我們這里使用 https://github.com/gorilla/mux 很簡單 這樣我們可以使用外部提供的關於路由的功能就可以擴充我們自己的業務邏輯結構(在Transport中修改邏輯)

// main.go
package main

import (
	EndPoint1 "Songzhibin/go-kit-demo/v0/EndPoint"
	"Songzhibin/go-kit-demo/v0/Server"
	"Songzhibin/go-kit-demo/v0/Transport"
	httpTransport "github.com/go-kit/kit/transport/http"
	"github.com/gorilla/mux"
	"net/http"
)

// 服務發布

func main() {
	// 1.先創建我們最開始定義的Server/server.go
	s := Server.Server{}

	// 2.在用EndPoint/endpoint.go 創建業務服務
	hello := EndPoint1.MakeServerEndPointHello(s)
	Bye := EndPoint1.MakeServerEndPointBye(s)

	// 3.使用 kit 創建 handler
	// 固定格式
	// 傳入 業務服務 以及 定義的 加密解密方法
	helloServer := httpTransport.NewServer(hello, Transport.HelloDecodeRequest, Transport.HelloEncodeResponse)
	sayServer := httpTransport.NewServer(Bye, Transport.ByeDecodeRequest, Transport.ByeEncodeResponse)

	//// 使用http包啟動服務
	//go http.ListenAndServe("0.0.0.0:8000", helloServer)
	//
	//go http.ListenAndServe("0.0.0.0:8001", sayServer)
	//select {}

	// https://github.com/gorilla/mux
	r := mux.NewRouter()
	// 注冊路由
	r.Handle("/hello", helloServer)
	r.Handle("/bye", sayServer)
	_ = http.ListenAndServe("0.0.0.0:8000", r)
}

運行一下看下效果


沒有問題 就可以向外提供關於REST風格的Api了

服務的注冊與發現

這里采用的是 consul 進行簡單Demo 使用其他同理更換api即可

我們使用docker進行部署 https://hub.docker.com/_/consul

// 映射 8500端口 -server 以服務的形式啟動 -boostrap 指定自己為leader,而不需要選舉 -ui 啟動一個內置管理web界面 -client 指定客戶端可以訪問的ip 0.0.0.0 為任意訪問,不設置為127.0.0.1
$ docker pull consul
$ docker run -d --name=Demo -p 8500:8500 consul agent -server -bootstrap -ui -client 0.0.0.0

https://www.consul.io/api-docs/agent/service 查看對應Api

因為我們 -ui所以直接在瀏覽器訪問 ip:8500可以看到一個啟動的web界面

我們先訪問 http://127.0.0.1:8500/v1/agent/services 這時候我們可以看到沒有任何服務

這時候我們在程序調用接口進行注冊

// main.go
package main

import (
	EndPoint1 "Songzhibin/go-kit-demo/v0/EndPoint"
	"Songzhibin/go-kit-demo/v0/Server"
	"Songzhibin/go-kit-demo/v0/Transport"
	httpTransport "github.com/go-kit/kit/transport/http"
	"github.com/gorilla/mux"
	"net/http"
)

// 服務發布

func main() {
	// 1.先創建我們最開始定義的Server/server.go
	s := Server.Server{}

	// 2.在用EndPoint/endpoint.go 創建業務服務
	hello := EndPoint1.MakeServerEndPointHello(s)
	Bye := EndPoint1.MakeServerEndPointBye(s)

	// 3.使用 kit 創建 handler
	// 固定格式
	// 傳入 業務服務 以及 定義的 加密解密方法
	helloServer := httpTransport.NewServer(hello, Transport.HelloDecodeRequest, Transport.HelloEncodeResponse)
	sayServer := httpTransport.NewServer(Bye, Transport.ByeDecodeRequest, Transport.ByeEncodeResponse)

	//// 使用http包啟動服務
	//go http.ListenAndServe("0.0.0.0:8000", helloServer)
	//
	//go http.ListenAndServe("0.0.0.0:8001", sayServer)
	//select {}

	// https://github.com/gorilla/mux
	r := mux.NewRouter()
	// 注冊路由
	r.Handle("/hello", helloServer)
	r.Handle("/bye", sayServer)

	// 因為這里要做服務發現,所以我們增加一個路由 進行心跳檢測使用
	r.Methods("GET").Path("/health").HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		w.Header().Set("Content-type", "application/json")
		w.Write([]byte(`{"status":"ok"}`))
	})
	_ = http.ListenAndServe("0.0.0.0:8000", r)
}

我們新增了一個路由,然后訪問試一下效果

然后我們進行服務注冊(PS:啟動consul服務這台一定要Ping通啟動業務服務這台)
我們創建一個xxx.json的文件
{
    "ID":"ServerID", // 唯一
    "Name":"ProjectName", // 不唯一
    "Tags":[
        "primary" // 可設置多Tag
    ],
    "Address": "ip", // 需要注冊服務的ip地址
    "Port": 8080, // 注冊服務的端口號
    "Check":{
        "HTTP":"url", // 剛才寫的健康監測的url
        "interval":"5s" // 探活時間
    }
}
運行指令 這里完全可以使用程序注冊哦
$ curl \
      --request PUT \
      --data @xxx.json \
      localhost:8500/v1/agent/service/register
// 取消注冊
$ curl \
      --request PUT \
      localhost:8500/v1/agent/service/deregister/注冊時候的ID
使用代碼進行服務注冊發現

https://github.com/hashicorp/consul
https://github.com/hashicorp/consul/tree/master/api // api
我們新建一個 tool目錄存放相關配置信息

// 項目結構
-| Server
----| server.go
-| EndPoint
----| endpoint.go
-| Transport
----| Transport.go
-| Tool
----| consul.go
- main.go
// Tool/consul.go
package Tool

import (
	"github.com/hashicorp/consul/api"
)

var client *api.Client
var res api.AgentServiceRegistration

// RegService:服務注冊
func RegService(address string, id string, name string, serviceIP string, servicePort int, Interval string, chickHttp string, tag ...string) error {
	// 用於客戶端的配置
	config := api.DefaultConfig()
	config.Address = address
	// 用於服務注冊
	res = api.AgentServiceRegistration{}
	res.ID = id
	res.Name = name
	res.Address = serviceIP
	res.Port = servicePort
	res.Tags = tag
	chicks := api.AgentServiceCheck{}
	// 間隔時間
	chicks.Interval = Interval
	chicks.HTTP = chickHttp
	// 賦值
	res.Check = &chicks

	// 創建客戶端
	var err error
	client, err = api.NewClient(config)
	if err != nil {
		return err
	}
	err = client.Agent().ServiceRegister(&res)
	if err != nil {
		return err
	}
	return nil
}

// LogOutServer 程序關閉后解除注冊
func LogOutServer() {
	_ = client.Agent().ServiceDeregister(res.ID)
}
// main.go
package main

import (
	EndPoint1 "Songzhibin/go-kit-demo/v0/EndPoint"
	"Songzhibin/go-kit-demo/v0/Server"
	"Songzhibin/go-kit-demo/v0/Tool"
	"Songzhibin/go-kit-demo/v0/Transport"
	"errors"
	"fmt"
	httpTransport "github.com/go-kit/kit/transport/http"
	"github.com/gorilla/mux"
	"net/http"
	"os"
	"os/signal"
	"syscall"
)

// 服務發布

func main() {
	// 1.先創建我們最開始定義的Server/server.go
	s := Server.Server{}

	// 2.在用EndPoint/endpoint.go 創建業務服務
	hello := EndPoint1.MakeServerEndPointHello(s)
	Bye := EndPoint1.MakeServerEndPointBye(s)

	// 3.使用 kit 創建 handler
	// 固定格式
	// 傳入 業務服務 以及 定義的 加密解密方法
	helloServer := httpTransport.NewServer(hello, Transport.HelloDecodeRequest, Transport.HelloEncodeResponse)
	sayServer := httpTransport.NewServer(Bye, Transport.ByeDecodeRequest, Transport.ByeEncodeResponse)

	//// 使用http包啟動服務
	//go http.ListenAndServe("0.0.0.0:8000", helloServer)
	//
	//go http.ListenAndServe("0.0.0.0:8001", sayServer)
	//select {}

	// https://github.com/gorilla/mux
	r := mux.NewRouter()
	// 注冊路由
	r.Handle("/hello", helloServer)
	r.Handle("/bye", sayServer)
	// 因為這里要做服務發現,所以我們增加一個路由 進行心跳檢測使用
	r.Methods("GET").Path("/health").HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		w.Header().Set("Content-type", "application/json")
		_, _ = w.Write([]byte(`{"status":"ok"}`))
	})
	// 注冊
	errChan := make(chan error)
	sign := make(chan os.Signal)
	go func() {
		err := Tool.RegService("127.0.0.1:8500", "1", "測試", "127.0.0.1", 8000, "5s", "http://192.168.8.176:8000/health", "test")
		if err != nil {
			errChan <- err
		}
		_ = http.ListenAndServe("0.0.0.0:8000", r)
	}()
	go func() {
		// 接收到信號
		signal.Notify(sign, syscall.SIGINT, syscall.SIGTERM)
		<-sign
		errChan <- errors.New("0")
	}()
	fmt.Println(<-errChan)
	Tool.LogOutServer()
}

客戶端使用服務發現調用服務

客戶端代碼架構看我上一篇 客戶端直連的 博客
主要修改的地方就是Client/client.go 新增了一個方法

// Client/client.go
package Client

import (
	"context"
	"errors"
	"fmt"
	"github.com/go-kit/kit/endpoint"
	"github.com/go-kit/kit/log"
	"github.com/go-kit/kit/sd"
	"github.com/go-kit/kit/sd/consul"
	"github.com/go-kit/kit/transport/http"
	"github.com/hashicorp/consul/api"
	"io"
	"math/rand"
	"net/url"
	"os"
	"strings"
)

// Direct: 直接調用服務端
// method:方法 fullUrl: 完整的url http://localhost:8000
// enc: http.EncodeRequestFunc dec: http.DecodeResponseFunc 這兩個函數具體等一下會在Transport中進行詳細解釋
// requestStruct: 根據EndPoint定義的request結構體傳參
func Direct(method string, fullUrl string, enc http.EncodeRequestFunc, dec http.DecodeResponseFunc, requestStruct interface{}) (interface{}, error) {
	// 1.解析url
	target, err := url.Parse(fullUrl)
	if err != nil {
		fmt.Println(err)
		return nil, err
	}
	// kit調用服務端拿到Client對象
	client := http.NewClient(strings.ToUpper(method), target, enc, dec)
	// 調用服務 client.Endpoint()返回一個可執行函數 傳入context 和 請求數據結構體
	return client.Endpoint()(context.Background(), requestStruct)
}

// ServiceDiscovery: 通過服務發現的形式調用服務
// registryAddress: 注冊中心的地址
// servicesName: 注冊的服務名稱
// tags: 可用標簽
// passingOnly: true 只返回通過健康監測的實例
// method:方法
// enc: http.EncodeRequestFunc dec: http.DecodeResponseFunc 這兩個函數具體等一下會在Transport中進行詳細解釋
// requestStruct: 根據EndPoint定義的request結構體傳參
func ServiceDiscovery(method string, registryAddress string, enc http.EncodeRequestFunc, dec http.DecodeResponseFunc, requestStruct interface{}, servicesName string, passingOnly bool, tags ...string) (interface{}, error) {
	// 1.通過consul api創建一個client, 使用go-kit sd 封裝一個專門的client 用於獲取服務對象
	config := api.DefaultConfig()
	// registryAddress 注冊中心的地址
	config.Address = registryAddress

	// 這里是拿到 consul 中的client
	apiClient, err := api.NewClient(config)
	if err != nil {
		return nil, err
	}
	// kit封裝client
	client := consul.NewClient(apiClient)

	logger := log.NewLogfmtLogger(os.Stdout)

	// 創建實例
	// logger 可以使用自己的logger對象 例如zap等
	// instances: 實例對象
	instances := consul.NewInstancer(client, logger, servicesName, tags, passingOnly)

	// f: Factory
	// servicesUrl: 傳入服務的url, 通過這個url根據直連一樣去獲取endpoint.Endpoint對象
	f := func(servicesUrl string) (endpoint.Endpoint, io.Closer, error) {
		// 解析url
		target, err := url.Parse("http://" + servicesUrl)
		if err != nil {
			return nil, nil, err
		}
		return http.NewClient(strings.ToUpper(method), target, enc, dec).Endpoint(), nil, nil
	}

	// 獲取endpoint可執行對象 與我們直連client.Endpoint()返回的一樣
	// 傳入 instances: 實例對象  Factory:工廠模式  logger: 日志對象
	endpointer := sd.NewEndpointer(instances, f, logger)

	// 獲取所有實例 endpoints
	endpoints, err := endpointer.Endpoints()
	if err != nil {
		return nil, err
	}
	// 隨機選擇一個執行
	l := len(endpoints)
	if l == 0 {
		return nil, errors.New("len(endpoints) == 0")
	}
	return endpoints[rand.Intn(len(endpoints))](context.Background(), requestStruct)
}

我們調用運行一下

// main.go
package main

import (
	"Songzhibin/go-kit-demo/v0client/Client"
	"Songzhibin/go-kit-demo/v0client/EndPoint"
	"Songzhibin/go-kit-demo/v0client/Transport"
	"fmt"
)

// 調用我們在client封裝的函數就好了
func main() {
	//i, err := Client.Direct("GET", "http://127.0.0.1:8000", Transport.ByeEncodeRequestFunc, Transport.ByeDecodeResponseFunc, EndPoint.HelloRequest{Name: "songzhibin"})
	i, err := Client.ServiceDiscovery("GET", "http://127.0.0.1:8500", Transport.ByeEncodeRequestFunc, Transport.ByeDecodeResponseFunc, EndPoint.HelloRequest{Name: "songzhibin"}, "測試", true, "test")
	if err != nil {
		fmt.Println(err)
		return
	}
	res, ok := i.(EndPoint.HelloResponse)
	if !ok {
		fmt.Println("no ok")
		return
	}
	fmt.Println(res)
}

中間件插入

提到中間件大家肯定很熟悉了,在kit中也是一樣的采用非侵入式,可定制性非常高,我們來看一下
Ps:中間件的使用在EndPoint中插入
kit中標准中間件函數簽名如下

func middleWare(... 自定義參數) endpoint.Middleware
// EndPoint/endpoint.go
package EndPoint

import (
	"Songzhibin/go-kit-demo/v0/Server"
	"context"
	"errors"
	"github.com/go-kit/kit/endpoint"
	"github.com/juju/ratelimit"
	"time"
)

// endpoint.go 定義 Request、Response 格式, 並且可以使用閉包來實現各種中間件的嵌套
// 這里了解 protobuf 的比較好理解點
// 就是聲明 接收數據和響應數據的結構體 並通過構造函數創建 在創建的過程當然可以使用閉包來進行一些你想要的操作啦

// 這里根據我們Demo來創建一個響應和請求
// 當然你想怎么創建怎么創建 也可以共用 這里我分開寫 便於大家看的清楚

// Hello 業務使用的請求和響應格式
// HelloRequest 請求格式
type HelloRequest struct {
	Name string `json:"name"`
}

// HelloResponse 響應格式
type HelloResponse struct {
	Reply string `json:"reply"`
}

// Bye 業務使用的請求和響應格式
// ByeRequest 請求格式
type ByeRequest struct {
	Name string `json:"name"`
}

// ByeResponse 響應格式
type ByeResponse struct {
	Reply string `json:"reply"`
}

// ------------ 當然 也可以通用的寫 ----------
// Request 請求格式
type Request struct {
	Name string `json:"name"`
}

// Response 響應格式
type Response struct {
	Reply string `json:"reply"`
}

type Bucket struct {
	*ratelimit.Bucket
}

// 限流桶
var B Bucket = Bucket{ratelimit.NewBucket(time.Hour, 2)}

// Limiting: 限流
func (b Bucket) Limiting() bool {
	return b.TakeAvailable(1) > 0
}

// MiddleWare: 中間件示例
// 中間件標准函數簽名 func middleWare(... 自定義參數) endpoint.Middleware
func MiddleWare(b Bucket) endpoint.Middleware {
	return func(e endpoint.Endpoint) endpoint.Endpoint {
		return func(ctx context.Context, request interface{}) (response interface{}, err error) {
			// 中間件處理
			if !b.Limiting() {
				return nil, errors.New("false")
			}
			// 如果成功通過就進到下一層
			return e(ctx, request)
		}
	}
}

// 這里創建構造函數 hello方法的業務處理
// MakeServerEndPointHello 創建關於業務的構造函數
// 傳入 Server/server.go 定義的相關業務接口
// 返回 go-kit/endpoint.Endpoint (實際上就是一個函數簽名)
func MakeServerEndPointHello(s Server.IServer) endpoint.Endpoint {
	// 這里使用閉包,可以在這里做一些中間件業務的處理
	return func(ctx context.Context, request interface{}) (response interface{}, err error) {
		// request 是在對應請求來時傳入的參數(這里的request 實際上是等下我們要將的Transport中一個decode函數中處理獲得的參數)
		// 這里進行以下斷言
		r, ok := request.(HelloRequest)
		if !ok {
			return Response{}, nil
		}
		// 這里實際上就是調用我們在Server/server.go中定義的業務邏輯
		// 我們拿到了 Request.Name 那么我們就可以調用我們的業務 Server.IServer 中的方法來處理這個數據並返回
		// 具體的業務邏輯具體定義....
		return HelloResponse{Reply: s.Hello(r.Name)}, nil
		// response 這里返回的response 可以返回任意的 不過根據規范是要返回我們剛才定義好的返回對象

	}
}

// 這里創建構造函數 Bye方法的業務處理
// MakeServerEndPointBye 創建關於業務的構造函數
// 傳入 Server/server.go 定義的相關業務接口
// 返回 go-kit/endpoint.Endpoint (實際上就是一個函數簽名)
func MakeServerEndPointBye(s Server.IServer) endpoint.Endpoint {
	// 這里使用閉包,可以在這里做一些中間件業務的處理
	return func(ctx context.Context, request interface{}) (response interface{}, err error) {
		// request 是在對應請求來時傳入的參數(這里的request 實際上是等下我們要將的Transport中一個decode函數中處理獲得的參數)
		// 這里進行以下斷言
		r, ok := request.(ByeRequest)
		if !ok {
			return Response{}, nil
		}
		// 這里實際上就是調用我們在Server/server.go中定義的業務邏輯
		// 我們拿到了 Request.Name 那么我們就可以調用我們的業務 Server.IServer 中的方法來處理這個數據並返回
		// 具體的業務邏輯具體定義....
		return ByeResponse{Reply: s.Bye(r.Name)}, nil
		// response 這里返回的response 可以返回任意的 不過根據規范是要返回我們剛才定義好的返回對象
	}
}

在mian.go中也做了一下嵌套

// main.go
package main

import (
	EndPoint1 "Songzhibin/go-kit-demo/v0/EndPoint"
	"Songzhibin/go-kit-demo/v0/Server"
	"Songzhibin/go-kit-demo/v0/Tool"
	"Songzhibin/go-kit-demo/v0/Transport"
	"errors"
	"fmt"
	httpTransport "github.com/go-kit/kit/transport/http"
	"github.com/gorilla/mux"
	"net/http"
	"os"
	"os/signal"
	"syscall"
)

// 服務發布

func main() {
	// 1.先創建我們最開始定義的Server/server.go
	s := Server.Server{}

	// 2.在用EndPoint/endpoint.go 創建業務服務

	hello := EndPoint1.MakeServerEndPointHello(s)

	// 加入中間件
	Bye := EndPoint1.MiddleWare(EndPoint1.B)(EndPoint1.MakeServerEndPointBye(s))

	// 3.使用 kit 創建 handler
	// 固定格式
	// 傳入 業務服務 以及 定義的 加密解密方法
	helloServer := httpTransport.NewServer(hello, Transport.HelloDecodeRequest, Transport.HelloEncodeResponse)
	sayServer := httpTransport.NewServer(Bye, Transport.ByeDecodeRequest, Transport.ByeEncodeResponse)

	//// 使用http包啟動服務
	//go http.ListenAndServe("0.0.0.0:8000", helloServer)
	//
	//go http.ListenAndServe("0.0.0.0:8001", sayServer)
	//select {}

	// https://github.com/gorilla/mux
	r := mux.NewRouter()
	// 注冊路由
	r.Handle("/hello", helloServer)
	r.Handle("/bye", sayServer)
	// 因為這里要做服務發現,所以我們增加一個路由 進行心跳檢測使用
	r.Methods("GET").Path("/health").HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		w.Header().Set("Content-type", "application/json")
		_, _ = w.Write([]byte(`{"status":"ok"}`))
	})
	// 注冊
	errChan := make(chan error)
	sign := make(chan os.Signal)
	go func() {
		err := Tool.RegService("127.0.0.1:8500", "1", "測試", "127.0.0.1", 8000, "5s", "http://10.43.1.106:8000/health", "test")
		if err != nil {
			errChan <- err
		}
		_ = http.ListenAndServe("0.0.0.0:8000", r)
	}()
	go func() {
		// 接收到信號
		signal.Notify(sign, syscall.SIGINT, syscall.SIGTERM)
		<-sign
		errChan <- errors.New("0")
	}()
	fmt.Println(<-errChan)
	Tool.LogOutServer()
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM