go標准庫的學習-net/rpc


參考:https://studygolang.com/pkgdoc

導入方法:

import "net/rpc"

RPC(Remote Procedure Call Protocol)就是想實現函數調用模式的網絡化,它是一種通過網絡從遠程計算機程序上請求服務,而不需要了解底層網絡技術的協議。

客戶端就像調用本地函數一樣,然后客戶端把這些參數打包之后通過網絡傳給服務端,服務端解包到處理過程中執行,然后執行結果返回給客戶端

運行時一次客戶機對服務器的RPC調用步驟有:

  • 調用客戶端句柄,執行傳送參數
  • 調用本地系統內核發送網絡信息
  • 消息傳送到遠程主機
  • 服務器句柄得到消息並取得參數
  • 執行遠程過程
  • 執行的過程將結果返回服務器句柄
  • 服務器句柄返回結果,調用遠程系統內核
  • 消息傳回本地主機
  • 客戶句柄由內核接收消息
  • 客戶接收句柄返回的數據

Go標准包中已經提供了對RPC的支持,支持三個級別的RPC:TCP、HTTP、JSONRPC,下面將一一說明

Go的RPC包與傳統的RPC系統不同,他只支持Go開發的服務器與客戶端之間的交互,因為在內部,它們采用了Gob來編碼

Go RPC的函數要滿足下面的條件才能夠被遠程調用,不然會被忽略:

  • 函數必須是導出的,即首字母為大寫
  • 必須有兩個導出類型的參數
  • 第一個參數是接收的參數,第二個參數是返回給客戶端的參數,第二個參數必須是指針類型的
  • 函數還要有一個error類型返回值。方法的返回值,如果非nil,將被作為字符串回傳,在客戶端看來就和errors.New創建的一樣。如果返回了錯誤,回復的參數將不會被發送給客戶端。

舉個例子,正確的RPC函數格式為:

func (t *T) MethidName(argType T1, replyType *T2) error

T、T1和T2類型都必須能被encoding/gob包編解碼

任何RPC都需要通過網絡來傳遞數據,Go RPC可以利用HTTP和TCP來傳遞數據

Constants

const (
    // HandleHTTP使用的默認值
    DefaultRPCPath   = "/_goRPC_"
    DefaultDebugPath = "/debug/rpc"
)

Variables 

var DefaultServer = NewServer()

DefaultServer是*Server的默認實例,本包和Server方法同名的函數都是對其方法的封裝。

type Server

type Server struct {
    // 內含隱藏或非導出字段
}

Server代表RPC服務端。

func NewServer

func NewServer() *Server

NewServer創建並返回一個*Server。

func (*Server) Register

func (server *Server) Register(rcvr interface{}) error

Register在server注冊並公布rcvr的方法集中滿足如下要求的方法:

- 方法是導出的
- 方法有兩個參數,都是導出類型或內建類型
- 方法的第二個參數是指針
- 方法只有一個error接口類型的返回值

如果rcvr不是一個導出類型的值,或者該類型沒有滿足要求的方法,Register會返回錯誤。Register也會使用log包將錯誤寫入日志。客戶端可以使用格式為"Type.Method"的字符串訪問這些方法,其中Type是rcvr的具體類型。

func (*Server) RegisterName

func (server *Server) RegisterName(name string, rcvr interface{}) error

RegisterName類似Register,但使用提供的name代替rcvr的具體類型名作為服務名。

func Register

func Register(rcvr interface{}) error

Register在DefaultServer注冊並公布rcvr的方法。

其實就相當於調用NewServer函數生成一個*Server,然后再調用其的(*Server) Register函數

func HandleHTTP

func HandleHTTP()

HandleHTTP函數注冊DefaultServer的RPC信息HTTP處理器對應到DefaultRPCPath,和DefaultServer的debug處理器對應到DefaultDebugPath。HandleHTTP函數會注冊到http.DefaultServeMux。之后,仍需要調用http.Serve(),一般會另開線程:"go http.Serve(l, nil)"

其實就相當於調用NewServer函數生成一個*Server,然后再調用其的(*Server) HandleHTTP函數

 

func DialHTTP

func DialHTTP(network, address string) (*Client, error)

DialHTTP在指定的網絡和地址與在默認HTTP RPC路徑監聽的HTTP RPC服務端連接。

func DialHTTPPath

func DialHTTPPath(network, address, path string) (*Client, error)

DialHTTPPath在指定的網絡、地址和路徑與HTTP RPC服務端連接。

上面兩個函數都是通過HTTP的方式和服務器建立連接,之間的區別之在於是否設置上下文路徑。

type Client

type Client struct {
    codec ClientCodec

    reqMutex sync.Mutex // protects following
    request  Request

    mutex    sync.Mutex // protects following
    seq      uint64
    pending  map[uint64]*Call
    closing  bool // user has called Close
    shutdown bool // server has told us to stop
}

Client類型代表RPC客戶端。同一個客戶端可能有多個未返回的調用,也可能被多個go程同時使用。

func (*Client) Call

func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) error

Call調用指定的方法,等待調用返回,將結果寫入reply,然后返回執行的錯誤狀態。

有三個參數,第一個要寫調用函數的名字,第二個是要傳遞的參數,第三個是要返回的參數(這個注意是指針類型)

舉例:

HTTP RPC

利用HTTP的好處是可以直接復用net/http中的一些函數,下面舉例說明:

服務端:

package main

import (
    "fmt" "net/http" "net/rpc" "errors" ) type Args struct{ A, B int } type Quotient struct{ Quo, Rem int } type Arith int func (t *Arith) Multiply(args *Args, reply *int) error{ *reply = args.A * args.B return nil } func (t *Arith) Divide(args *Args, quo *Quotient) error{ if args.B == 0{ return errors.New("divide by zero") } quo.Quo = args.A / args.B quo.Rem = args.A % args.B return nil } func main() { arith := new(Arith) rpc.Register(arith) rpc.HandleHTTP() err := http.ListenAndServe(":1234", nil) if err != nil{ fmt.Println(err.Error()) } }

客戶端:

package main

import (
    "fmt" "net/rpc" "log" "os" ) type Args struct{ A, B int } type Quotient struct{ Quo, Rem int } func main() { if len(os.Args) != 2{ fmt.Println("Usage: ", os.Args[0], "server") os.Exit(1) } serverAddress := os.Args[1] client, err := rpc.DialHTTP("tcp", serverAddress + ":1234") if err != nil{ log.Fatal("dialing : ", err) } //Synchronous call args := Args{17, 8} var reply int err = client.Call("Arith.Multiply", args, &reply) if err != nil{ log.Fatal("arith error : ", err) } fmt.Printf("Arith: %d*%d = %d \n", args.A, args.B, reply) var quot Quotient err = client.Call("Arith.Divide", args, &quot) if err != nil{ log.Fatal("arith error : ", err) } fmt.Printf("Arith: %d/%d = %d remainder %d\n", args.A, args.B, quot.Quo, quot.Rem) }

客戶端返回:

userdeMBP:go-learning user$ go run test.go
Usage:  /var/folders/2_/g5wrlg3x75zbzyqvsd5f093r0000gn/T/go-build438875911/b001/exe/test server exit status 1 userdeMBP:go-learning user$ go run test.go 127.0.0.1 Arith: 17*8 = 136 Arith: 17/8 = 2 remainder 1

 

TCP RPC連接

func Dial

func Dial(network, address string) (*Client, error)

Dial在指定的網絡和地址與RPC服務端連接。

func (*Server) ServeConn

func (server *Server) ServeConn(conn io.ReadWriteCloser)

ServeConn在單個連接上執行server。ServeConn會阻塞,服務該連接直到客戶端掛起。調用者一般應另開線程調用本函數:"go server.ServeConn(conn)"。ServeConn在該連接使用gob(參見encoding/gob包)有線格式。要使用其他的編解碼器,可調用ServeCodec方法。

func ServeConn

func ServeConn(conn io.ReadWriteCloser)

ServeConn在單個連接上執行DefaultServer。ServeConn會阻塞,服務該連接直到客戶端掛起。調用者一般應另開線程調用本函數:"go ServeConn(conn)"。ServeConn在該連接使用gob(參見encoding/gob包)有線格式。要使用其他的編解碼器,可調用ServeCodec方法。

其實就相當於調用NewServer函數生成一個*Server,然后再調用其的(*Server) ServeConn函數

 

JSON RPC連接

服務端:

package main

import (
    "fmt" "net" "net/rpc" "net/rpc/jsonrpc" "errors" "os" ) type Args struct{ A, B int } type Quotient struct{ Quo, Rem int } type Arith int func (t *Arith) Multiply(args *Args, reply *int) error{ *reply = args.A * args.B return nil } func (t *Arith) Divide(args *Args, quo *Quotient) error{ if args.B == 0{ return errors.New("divide by zero") } quo.Quo = args.A / args.B quo.Rem = args.A % args.B return nil } func main() { arith := new(Arith) rpc.Register(arith) tcpAddr, err := net.ResolveTCPAddr("tcp", ":1234")//jsonrpc是基於TCP協議的,現在他還不支持http協議 if err != nil{ fmt.Println(err.Error()) os.Exit(1) } listener, err := net.ListenTCP("tcp", tcpAddr) if err != nil{ fmt.Println(err.Error()) os.Exit(1) } for{ conn, err := listener.Accept() if err != nil{ continue } jsonrpc.ServeConn(conn) } }

客戶端:

package main

import (
    "fmt" "net/rpc/jsonrpc" "log" "os" ) type Args struct{ A, B int } type Quotient struct{ Quo, Rem int } func main() { if len(os.Args) != 2{ fmt.Println("Usage: ", os.Args[0], "server:port") os.Exit(1) } service := os.Args[1] client, err := jsonrpc.Dial("tcp", service) if err != nil{ log.Fatal("dialing : ", err) } //Synchronous call args := Args{17, 8} var reply int err = client.Call("Arith.Multiply", args, &reply) if err != nil{ log.Fatal("arith error : ", err) } fmt.Printf("Arith: %d*%d = %d \n", args.A, args.B, reply) var quot Quotient err = client.Call("Arith.Divide", args, &quot) if err != nil{ log.Fatal("arith error : ", err) } fmt.Printf("Arith: %d/%d = %d remainder %d\n", args.A, args.B, quot.Quo, quot.Rem) }

客戶端返回:

userdeMBP:go-learning user$ go run test.go 127.0.0.1:1234 Arith: 17*8 = 136 Arith: 17/8 = 2 remainder 1

 

 

type Request

type Request struct {
    ServiceMethod string // 格式:"Service.Method"
    Seq           uint64 // 由客戶端選擇的序列號
    // 內含隱藏或非導出字段
}

Request是每個RPC調用請求的頭域。它是被內部使用的,這里的文檔用於幫助debug,如分析網絡擁堵時。

type Response

type Response struct {
    ServiceMethod string // 對應請求的同一字段
    Seq           uint64 // 對應請求的同一字段
    Error         string // 可能的錯誤
    // 內含隱藏或非導出字段
}

Response是每個RPC調用回復的頭域。它是被內部使用的,這里的文檔用於幫助debug,如分析網絡擁堵時。

type ClientCodec

type ClientCodec interface {
    // 本方法必須能安全的被多個go程同時使用
    WriteRequest(*Request, interface{}) error
    ReadResponseHeader(*Response) error
    ReadResponseBody(interface{}) error
    Close() error
}

ClientCodec接口實現了RPC會話的客戶端一側RPC請求的寫入和RPC回復的讀取。客戶端調用WriteRequest來寫入請求到連接,然后成對調用ReadRsponseHeader和ReadResponseBody以讀取回復。客戶端在結束該連接的事務時調用Close方法。ReadResponseBody可以使用nil參數調用,以強制回復的主體被讀取然后丟棄。

func NewClient

func NewClient(conn io.ReadWriteCloser) *Client

NewClient返回一個新的Client,以管理對連接另一端的服務的請求。它添加緩沖到連接的寫入側,以便將回復的頭域和有效負載作為一個單元發送。

func NewClientWithCodec

func NewClientWithCodec(codec ClientCodec) *Client

NewClientWithCodec類似NewClient,但使用指定的編解碼器,以編碼請求主體和解碼回復主體。

NewClient使用默認編碼gobClientCodec,NewClientWithCodec使用自定義的其它編碼。

默認的gobClientCodec代碼為:

type gobClientCodec struct {
    rwc    io.ReadWriteCloser
    dec    *gob.Decoder
    enc    *gob.Encoder
    encBuf *bufio.Writer
}

//指定客戶端將以什么樣的編碼方式將信息發送給服務端,在調用Go和Call函數是會自動使用WriteRequest方法發送信息 func (c
*gobClientCodec) WriteRequest(r *Request, body interface{}) (err error) { if err = c.enc.Encode(r); err != nil { //Encode方法將request r編碼后發送 return } if err = c.enc.Encode(body); err != nil {//Encode方法將body編碼后發送 return } return c.encBuf.Flush() } func (c *gobClientCodec) ReadResponseHeader(r *Response) error { return c.dec.Decode(r)////Decode從輸入流讀取下一個之並將該值存入response r } func (c *gobClientCodec) ReadResponseBody(body interface{}) error { return c.dec.Decode(body) //Decode從輸入流讀取下一個之並將該值存入body } func (c *gobClientCodec) Close() error { return c.rwc.Close() }

創建Client時將調用默認成對調用ReadRsponseHeader和ReadResponseBody以讀取回復

舉例自定義編碼:

 

package main

import (
    "log"
    "net/rpc"
    // "errors"
    "fmt"
)
type shutdownCodec struct {
    responded chan int
    closed    bool
}

func (c *shutdownCodec) WriteRequest(*rpc.Request, interface{}) error {//這是client用來發送請求的方法
    fmt.Println("call WriteRequest")
    return nil 
}
func (c *shutdownCodec) ReadResponseBody(interface{}) error{
    fmt.Println("call ReadResponseBody")
    return nil 
}
func (c *shutdownCodec) ReadResponseHeader(*rpc.Response) error {
    c.responded <- 1 //如果注釋掉這里,則會一直卡在"wait response : "
    return nil
    // return errors.New("shutdownCodec ReadResponseHeader") //如果返回的是error,那么就不會去調用ReadResponseBody了
}
func (c *shutdownCodec) Close() error {
    c.closed = true
    return nil
}

func main() {
    codec := &shutdownCodec{responded: make(chan int)}
    client := rpc.NewClientWithCodec(codec)
    fmt.Println("wait response : ") //從返回結果可以看出來,NewClientWithCodec后會自動成對調用ReadResponseBody和ReadResponseHeader
    fmt.Println(<-codec.responded)
    fmt.Println(codec.closed) //false
    client.Close()
    if !codec.closed {
        log.Fatal("client.Close did not close codec")
    }
    fmt.Println(codec.closed) //true
}

 

 

type ServerCodec

type ServerCodec interface {
    ReadRequestHeader(*Request) error
    ReadRequestBody(interface{}) error
    // 本方法必須能安全的被多個go程同時使用
    WriteResponse(*Response, interface{}) error
    Close() error
}

ServerCodec接口實現了RPC會話的服務端一側RPC請求的讀取和RPC回復的寫入。服務端通過成對調用方法ReadRequestHeader和ReadRequestBody從連接讀取請求,然后調用WriteResponse來寫入回復。服務端在結束該連接的事務時調用Close方法。ReadRequestBody可以使用nil參數調用,以強制請求的主體被讀取然后丟棄。

默認的gobServerCodec:

type gobServerCodec struct {
    rwc    io.ReadWriteCloser
    dec    *gob.Decoder
    enc    *gob.Encoder
    encBuf *bufio.Writer
    closed bool
}

func (c *gobServerCodec) ReadRequestHeader(r *Request) error {
    return c.dec.Decode(r) //解碼並讀取client發來的請求request
}

func (c *gobServerCodec) ReadRequestBody(body interface{}) error {
    return c.dec.Decode(body) //解碼並讀取client發來的請求body
}

//指定服務端將會以什么樣的編碼方式將數據返回給客戶端,在調用ServeRequest和ServeCodec方法時會自動調用該WriteResponse函數 func (c
*gobServerCodec) WriteResponse(r *Response, body interface{}) (err error) { if err = c.enc.Encode(r); err != nil { //對響應信息request編碼 if c.encBuf.Flush() == nil { // Gob couldn't encode the header. Should not happen, so if it does, // shut down the connection to signal that the connection is broken. log.Println("rpc: gob error encoding response:", err) c.Close() } return } if err = c.enc.Encode(body); err != nil {//對響應信息body編碼 if c.encBuf.Flush() == nil { // Was a gob problem encoding the body but the header has been written. // Shut down the connection to signal that the connection is broken. log.Println("rpc: gob error encoding body:", err) c.Close() } return } return c.encBuf.Flush() } func (c *gobServerCodec) Close() error { if c.closed { // Only call c.rwc.Close once; otherwise the semantics are undefined. return nil } c.closed = true return c.rwc.Close() }

func (*Server) ServeCodec

func (server *Server) ServeCodec(codec ServerCodec)

ServeCodec類似ServeConn,但使用指定的編解碼器,以編碼請求主體和解碼回復主體。

func (*Server) ServeRequest

func (server *Server) ServeRequest(codec ServerCodec) error

ServeRequest類似ServeCodec,但異步的服務單個請求。它不會在調用結束后關閉codec。

下面兩個函數的不同在於他們使用在DefaultServer上:

func ServeCodec

func ServeCodec(codec ServerCodec)

ServeCodec類似ServeConn,但使用指定的編解碼器,以編碼請求主體和解碼回復主體。

func ServeRequest

func ServeRequest(codec ServerCodec) error

ServeRequest類似ServeCodec,但異步的服務單個請求。它不會在調用結束后關閉codec。

 

func (*Client) Close

func (client *Client) Close() error

func (*Server) Accept

func (server *Server) Accept(lis net.Listener)

Accept接收監聽器l獲取的連接,然后服務每一個連接。Accept會阻塞,調用者應另開線程:"go server.Accept(l)"

舉例:

package main

import (
    "log"
    "net"
    "net/rpc"
    "strings"
    "fmt"
)
type R struct {
    // Not exported, so R does not work with gob.
    // 所以這樣運行的話會報錯rpc: gob error encoding body: gob: type main.R has no exported fields
    // msg []byte 
    Msg []byte //改成這樣
}

type S struct{}

func (s *S) Recv(nul *struct{}, reply *R) error {
    *reply = R{[]byte("foo")}
    return nil
}

func main() {
    defer func() {
        err := recover()
        if err == nil {
            log.Fatal("no error")
        }
        if !strings.Contains(err.(error).Error(), "reading body EOF") {
            log.Fatal("expected `reading body EOF', got", err)
        }
    }()
    //服務端
    rpc.Register(new(S))

    listen, err := net.Listen("tcp", "127.0.0.1:1234")//端口為0表示任意端口
    if err != nil {
        panic(err)
    }
    go rpc.Accept(listen) //必須用並發監聽,因為客戶端和服務端寫在了一起 //客戶端
    client, err := rpc.Dial("tcp", listen.Addr().String())
    if err != nil {
        panic(err)
    }

    var reply R
    err = client.Call("S.Recv", &struct{}{}, &reply)
    if err != nil {
        panic(err)
    }

    fmt.Printf("%q\n", reply)
    
    client.Close()
    listen.Close()
    
}

返回:

userdeMBP:go-learning user$ go run test.go
{"foo"}
2019/02/28 15:05:07 rpc.Serve: accept:accept tcp 127.0.0.1:1234: use of closed network connection
2019/02/28 15:05:07 no error
exit status 1

 

 

type Call

 

type Call struct {
    ServiceMethod string      // 調用的服務和方法的名稱
    Args          interface{} // 函數的參數(下層為結構體指針)
    Reply         interface{} // 函數的回復(下層為結構體指針)
    Error         error       // 在調用結束后,保管錯誤的狀態
    Done          chan *Call  // 對其的接收操作會阻塞,直到遠程調用結束
}

 

Call類型代表一個執行中/執行完畢的RPC會話。

func (*Client) Go

func (client *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call

Go異步的調用函數。本方法Call結構體類型指針的返回值代表該次遠程調用。通道類型的參數done會在本次調用完成時發出信號(通過返回本次Go方法的返回值)。如果done為nil,Go會申請一個新的通道(寫入返回值的Done字段);如果done非nil,done必須有緩沖,否則Go方法會故意崩潰。

舉例:

使用的仍是上面TCP RPC的服務端,客戶端改為:

package main

import (
    "fmt"
    "net/rpc"
    "log"
    "os"
)
type Args struct{
    A, B int
}

type Quotient struct{
    Quo, Rem int
}

func main() {
    if len(os.Args) != 2{
        fmt.Println("Usage: ", os.Args[0], "server:port")
        os.Exit(1)
    }
    service := os.Args[1]

    client, err := rpc.Dial("tcp", service)
    if err != nil{
        log.Fatal("dialing : ", err)
    }

    //Synchronous call
    args := Args{17, 8}
    var reply int
    done := make(chan *rpc.Call, 1)

    call := client.Go("Arith.Multiply", args, &reply, done) //異步調用,只有當該方法執行完畢后done的值才不為nil
    if call.Error != nil {
        log.Fatal(err)
    }

    
    if resultCall := <-done; resultCall != nil {//如果不<-done,reply將不會有結果,reply將為0
        fmt.Printf("Arith: %d*%d = %d \n", args.A, args.B, reply)
        fmt.Printf("done : %#v\n", resultCall)
        fmt.Printf("Multiply result : %#v\n", *(resultCall.Reply.(*int)))//根據Call的Reply得到返回的值
    }
    
    var quot Quotient
    err = client.Call("Arith.Divide", args, &quot)
    if err != nil{
        log.Fatal("arith error : ", err)
    }
    fmt.Printf("Arith: %d/%d = %d remainder %d\n", args.A, args.B, quot.Quo, quot.Rem)   

}

返回:

userdeMBP:go-learning user$ go run test.go 127.0.0.1:1234
Arith: 17*8 = 136 
done : &rpc.Call{ServiceMethod:"Arith.Multiply", Args:main.Args{A:17, B:8}, Reply:(*int)(0xc000012100), Error:error(nil), Done:(chan *rpc.Call)(0xc0001142a0)}
Multiply result : 136
Arith: 17/8 = 2 remainder 1

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM