RPC 通信过程
由服务提供者给出业务接口声明,在调用方的程序里面,RPC 框架根据调用的服务接口提前生成动态代理实现类,并通过依赖注入等技术注入到声明了该接口的相关业务逻辑里面。该代理实现类会拦截所有的方法调用,在提供的方法处理逻辑里面完成一整套的远程调用,并把远程调用结果返回给调用方,这样调用方在调用远程方法的时候就获得了像调用本地接口一样的体验。
Go
语言的 RPC
包的路径为 net/rpc
,也就是放在了 net
包目录下面。因此我们可以猜测该 RPC
包是建立在 net
包基础之上的。
所以先简单的实现一个 demo
版本的 rpc
服务。
RPC “Hello, World” server 端 1 2 3 4 5 6 type HelloService struct {}func (p *HelloService) Hello(request string , reply *string ) error { *reply = "hello:" + request return nil }
根据 Go
语言的 RPC
规则:方法只能有两个可序列化的参数,其中第二个参数是指针类型,并且返回一个 error
类型,同时必须是公开的方法。
然后就可以将 HelloService
类型的对象注册为一个 RPC
服务:
1 2 3 4 5 6 7 8 9 10 func main () { rpc.RegisterName("HelloService" , new (HelloService)) listener, err := net.Listen("tcp" , ":1234" ) if err != nil { log.Fatal("ListenTCP error:" , err) } rpc.Accept(listener) }
其中 rpc.Register
函数调用会将对象类型中所有满足 RPC
规则的对象方法注册为 RPC
函数 ,所有注册的方法会放在 “HelloService” 服务空间之下。然后我们建立一个唯一的 TCP
连接,并且通过 rpc.ServeConn
函数在该 TCP
连接上为对方提供 RPC
服务。
代码分析 RegisterName 分析 1 rpc.RegisterName("HelloService" , new (HelloService))
这行是注册服务到 rpc
中。跳转到源码中查看:
首先看 DefaultServer
:
默认是 Server
结构体:
rpc.RegisterName
调用的实际上就是 Server.RegisterName
;而 RegisterName
方法最终调用的是 register
不可导出的方法:
看下 register
函数:
逻辑很简单:
实例化一个 service
,并将rpc
服务类型和值赋值给 service
。
通过 suitableMethods
函数获取服务满足 rpc
要求的 method
method
必须是导出的。
必须要有三个部分:receiver
,*args
,*reply
。
第一个参数不能是指针类型的。
第二个参数必须是指针类型的。
*reply
这个参数必须是可以内置或者可导出的类型,比如string
、int
、struct
,不能写一个不能导出的类型。
返回参数必须是一个,而且必须是Error
类型。
然后将方法和注册的服务名存到 Server
(注意不是server
)的 serviceMap
字段上。
接着开始启动一个tcp
服务,监听端口,Accept
请求并处理。
Accept 分析 Accept
方法最终调用的是 Server
的 Accept
:
主要是一个不断接受新的请求连接的for
循环 ,一旦监听器接收了一个连接,之后为每个连接开一个go
协程调用ServerConn
进行处理。
ServerConn
方法也很简单,首先构建一个Codec
结构体,去处理RPC
协议,参数包括连接、序列化反序列化方法以及Writer
,标准库的默认序列化方式为gobServeCodec
。
golang
官方还提供了net/rpc/jsonrpc
库实现RPC
方法,JSON RPC
采用JSON
进行数据编解码,因而支持跨语言调用,后面介绍。
在这里我们可以看到server.ServeCodec
方法的参数为一个接口, 如果你要自己实现一个rpc
协议的话,只需要实现ServerCodec
接口对应的的方法就可以进行个性化开发了。
默认是使用 gobServerCodec
来实现ServerCodec
接口的,所以主要看下 gobServerCodec
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 type gobServerCodec struct { rwc io.ReadWriteCloser dec *gob.Decoder enc *gob.Encoder encBuf *bufio.Writer closed bool }func (c *gobServerCodec) ReadRequestHeader(r *Request) error { return c.dec.Decode(r) }func (c *gobServerCodec) ReadRequestBody(body any) error { return c.dec.Decode(body) }func (c *gobServerCodec) WriteResponse(r *Response, body any) (err error ) { if err = c.enc.Encode(r); err != nil { if c.encBuf.Flush() == nil { log.Println("rpc: gob error encoding response:" , err) c.Close() } return } if err = c.enc.Encode(body); err != nil { if c.encBuf.Flush() == nil { log.Println("rpc: gob error encoding body:" , err) c.Close() } return } return c.encBuf.Flush() }func (c *gobServerCodec) Close() error { if c.closed { return nil } c.closed = true return c.rwc.Close() }
主要看下 ServeCodec
方法,因为真正处理的就是这个方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 func (server *Server) ServeCodec(codec ServerCodec) { sending := new (sync.Mutex) wg := new (sync.WaitGroup) for { service, mtype, req, argv, replyv, keepReading, err := server.readRequest(codec) if err != nil { if debugLog && err != io.EOF { log.Println("rpc:" , err) } if !keepReading { break } if req != nil { server.sendResponse(sending, req, invalidRequest, codec, err.Error()) server.freeRequest(req) } continue } wg.Add(1 ) go service.call(server, sending, wg, mtype, req, argv, replyv, codec) } wg.Wait() codec.Close() }
ServeCodec
方法的主要逻辑是一个for循环,在for循环中主要有两个方法:
server.readRequest
——读取请求数据并解码
server.call
——调用客户端要调用的方法,将返回值返回给客户端 接下来我们看server.readRequest
方法:
server.readRequest
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 func (server *Server) readRequest(codec ServerCodec) (service *service, mtype *methodType, req *Request, argv, replyv reflect.Value, keepReading bool , err error ) { service, mtype, req, keepReading, err = server.readRequestHeader(codec) if err != nil { if !keepReading { return } codec.ReadRequestBody(nil ) return } argIsValue := false if mtype.ArgType.Kind() == reflect.Pointer { argv = reflect.New(mtype.ArgType.Elem()) } else { argv = reflect.New(mtype.ArgType) argIsValue = true } if err = codec.ReadRequestBody(argv.Interface()); err != nil { return } if argIsValue { argv = argv.Elem() } replyv = reflect.New(mtype.ReplyType.Elem()) switch mtype.ReplyType.Elem().Kind() { case reflect.Map: replyv.Elem().Set(reflect.MakeMap(mtype.ReplyType.Elem())) case reflect.Slice: replyv.Elem().Set(reflect.MakeSlice(mtype.ReplyType.Elem(), 0 , 0 )) } return }
readRequest
方法同样包含三个步骤:
解析头部信息:若出错且选择跳过本次请求(keepReading==true)
等待处理下一个请求,这时需要取出连接中的本次请求的消息主体避免影响对读取下一次请求时出错
解析请求主体信息:通过要调用的方法对应的参数类型来构造参数实例指针(通过reflect.New),然后再通过ReadRequestBody
解码参数信息。
构造响应值实例:调用反射的MakeSlice
或者 MakeMap
方法来申请内存(创建实例),构造响应实例。
net/rpc 将消息分为头部和主体两部分:
对于Request
,代码如下:
对于Response
,代码如下:
由于每次请求和响应都需要定义Request/Response
对象,为了减少内存分配,net/rpc
实现了对象的复用,通过链表(freeReq/freeResp)
的方式实现了一个对象池。这部分不展开细说了。
所有工作都准备好了,然后就开始call
处理了:
service.call
需要注意的几点包括:
反射的Method
类型的Func
字段记录了调用方法所需的信息,包括方法地址等;
调用rpc
方法时需要传递参数:调用的方法所属的结构体实例、方法参数、方法响应值;
returnValues
中的对象是reflect.Value
类型,转为interface{}
类型再转为确切的类型。
之后调用server.sendResponse
发送响应,释放请求。
至此,一个rpc
服务端流程走完。
client 端 客户端代码很简单:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 func main () { client, err := rpc.Dial("tcp" , "localhost:1234" ) if err != nil { log.Fatal("dialing:" , err) } var reply string err = client.Call("HelloService.Hello" , "hello" , &reply) if err != nil { log.Fatal(err) } fmt.Println(reply) }
首先是通过 rpc.Dial
拨号 RPC
服务,然后通过 client.Call
调用具体的 RPC
方法。在调用 client.Call
时,第一个参数是用点号连接的 RPC
服务名字和方法名字,第二和第三个参数分别我们定义 RPC
方法的两个参数。
从代码中可以看到,默认的是gob
反序列化。
然后看一下 NewClientWithCodec
部分:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 func NewClientWithCodec (codec ClientCodec) *Client { client := &Client{ codec: codec, pending: make (map [uint64 ]*Call), } go client.input() return client }type ClientCodec interface { WriteRequest(*Request, any) error ReadResponseHeader(*Response) error ReadResponseBody(any) error Close() error }
之前在服务端代码解析中,说过实现ServerCodec
接口的方法就可以进行个性化开发,这里也是这样,实现ClientCodec
接口,才能进行个性化开发。区别在于客户端是写请求读响应,而服务端是读请求写响应。
在这里需要注意的是,在NewClientWithCodec
方法中,通过异步的方式调用了client.input
方法,这个方法其实是对返回值的处理 。
到此我们先暂停一下,先看下Call
的逻辑。
通过client.Call
方法调用指定的RPC
方法,属于同步调用,本质上调用了Go
方法,然后等待接收调用结束信号,信号由Done
传递。同步异步调用的控制也是在这里通过done
这个channel
来控制的。
client.Go
方法主要做的过程就是初始化 call
结构体,之后通过client.send
方法发送请求。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 func (client *Client) send(call *Call) { client.reqMutex.Lock() defer client.reqMutex.Unlock() client.mutex.Lock() if client.shutdown || client.closing { client.mutex.Unlock() call.Error = ErrShutdown call.done() return } seq := client.seq client.seq++ client.pending[seq] = call client.mutex.Unlock() client.request.Seq = seq client.request.ServiceMethod = call.ServiceMethod err := client.codec.WriteRequest(&client.request, call.Args) if err != nil { client.mutex.Lock() call = client.pending[seq] delete (client.pending, seq) client.mutex.Unlock() if call != nil { call.Error = err call.done() } } }
在前面,我们提到,在创建客户端的过程中,异步调用了client.input
的方法用于对RPC调用的返回值进行处理。代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 func (client *Client) input() { var err error var response Response for err == nil { response = Response{} err = client.codec.ReadResponseHeader(&response) if err != nil { break } seq := response.Seq client.mutex.Lock() call := client.pending[seq] delete (client.pending, seq) client.mutex.Unlock() switch { case call == nil : err = client.codec.ReadResponseBody(nil ) if err != nil { err = errors.New("reading error body: " + err.Error()) } case response.Error != "" : call.Error = ServerError(response.Error) err = client.codec.ReadResponseBody(nil ) if err != nil { err = errors.New("reading error body: " + err.Error()) } call.done() default : err = client.codec.ReadResponseBody(call.Reply) if err != nil { call.Error = errors.New("reading body " + err.Error()) } call.done() } } client.reqMutex.Lock() client.mutex.Lock() client.shutdown = true closing := client.closing if err == io.EOF { if closing { err = ErrShutdown } else { err = io.ErrUnexpectedEOF } } for _, call := range client.pending { call.Error = err call.done() } client.mutex.Unlock() client.reqMutex.Unlock() if debugLog && err != io.EOF && !closing { log.Println("rpc: client protocol error:" , err) } }
主要流程包括:
从socket
连接中轮询获取响应消息(消息头+消息体)
首先读取消息头,通过序列号seq
获取待处理请求
读取请求体信息
Call == nil
:此时我们没有pending
的call
,意味着写请求失败了,并且call
已经删除,response
是一个error
信息,此时我们仍需要去读response body
。
response.Error != ""
:RPC
方法内部出错,需要将响应消息读取出来但是不需要得到具体的消息内容。call.Error = ServerError(response.Error)
设置请求的返回值err
,ServerError
是string
的别名。
正常的处理流程
处理过程出错,退出循环,关闭连接
处理服务端响应是启动一个goroutine
进行轮询,为了防止在向服务端发送请求时该goroutine
因出错而要关闭连接,因此采用client.reqMutex
。
使用client.mutex
是该逻辑涉及对map
的读取,对client
一些属性的写入,防止写入/读取冲突
client.shutdown = true
表示客户端异常退出,因此需要处理client.pending
中待处理的call
,防止一些RPC
调用在Call
方法处阻塞等待(<-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done)
更安全的 RPC 接口 在涉及 RPC
的应用中,作为开发人员一般至少有三种角色:首先是服务端实现 RPC
方法的开发人员,其次是客户端调用 RPC
方法的人员,最后也是最重要的是制定服务端和客户端 RPC
接口规范的设计人员。在前面的例子中我们为了简化将以上几种角色的工作全部放到了一起,虽然看似实现简单,但是不利于后期的维护和工作的切割。
如果要重构 HelloService
服务,第一步需要明确服务的名字和接口:
1 2 3 4 5 6 7 8 9 const HelloServiceName = "path/to/pkg.HelloService" type HelloServiceInterface interface { Hello(request string , reply *string ) error }func RegisterHelloService (svc HelloServiceInterface) error { return rpc.RegisterName(HelloServiceName, svc) }
我们将 RPC
服务的接口规范分为三个部分:首先是服务的名字,然后是服务要实现的详细方法列表,最后是注册该类型服务的函数。为了避免名字冲突,我们在 RPC
服务的名字中增加了包路径前缀(这个是 RPC
服务抽象的包路径,并非完全等价 Go 语言的包路径)。RegisterHelloService
注册服务时,编译器会要求传入的对象满足 HelloServiceInterface
接口。
在定义了 RPC 服务接口规范之后,客户端就可以根据规范编写 RPC 调用的代码了:
1 2 3 4 5 6 7 8 9 10 11 12 func main () { client, err := rpc.Dial("tcp" , "localhost:1234" ) if err != nil { log.Fatal("dialing:" , err) } var reply string err = client.Call(HelloServiceName+".Hello" , "hello" , &reply) if err != nil { log.Fatal(err) } }
其中唯一的变化是 client.Call
的第一个参数用 HelloServiceName+".Hello"
代替了 "HelloService.Hello"
。然而通过 client.Call
函数调用 RPC 方法依然比较繁琐,同时参数的类型依然无法得到编译器提供的安全保障。
为了简化客户端用户调用 RPC 函数,我们在可以在接口规范部分增加对客户端的简单包装:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 type HelloServiceClient struct { *rpc.Client }var _ HelloServiceInterface = (*HelloServiceClient)(nil )func DialHelloService (network, address string ) (*HelloServiceClient, error ) { c, err := rpc.Dial(network, address) if err != nil { return nil , err } return &HelloServiceClient{Client: c}, nil }func (p *HelloServiceClient) Hello(request string , reply *string ) error { return p.Client.Call(HelloServiceName+".Hello" , request, reply) }
我们在接口规范中针对客户端新增加了 HelloServiceClient
类型,该类型也必须满足 HelloServiceInterface
接口,这样客户端用户就可以直接通过接口对应的方法调用 RPC
函数。同时提供了一个 DialHelloService
方法,直接拨号 HelloService
服务。
基于新的客户端接口,我们可以简化客户端用户的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 func main () { client, err := DialHelloService("tcp" , "localhost:1234" ) if err != nil { log.Fatal("dialing:" , err) } var reply string err = client.Hello("hello" , &reply) if err != nil { log.Fatal(err) } }
现在客户端用户不用再担心 RPC
方法名字或参数类型不匹配等低级错误的发生。
最后是基于 RPC
接口规范编写真实的服务端代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 type HelloService struct {}func (p *HelloService) Hello(request string , reply *string ) error { *reply = "hello:" + request return nil }func main () { RegisterHelloService(new (HelloService)) listener, err := net.Listen("tcp" , ":1234" ) if err != nil { log.Fatal("ListenTCP error:" , err) } for { conn, err := listener.Accept() if err != nil { log.Fatal("Accept error:" , err) } go rpc.ServeConn(conn) } }
Accept
方法主要是一个不断接受新的请求连接的for
循环 ,一旦监听器接收了一个连接,之后为每个连接开一个go
协程调用 ServerConn
进行处理。
在新的 RPC
服务端实现中,我们用 RegisterHelloService
函数来注册函数,这样不仅可以避免命名服务名称的工作,同时也保证了传入的服务对象满足了 RPC
接口的定义。最后我们新的服务改为支持多个 TCP
连接,然后为每个 TCP
连接提供 RPC
服务。
跨语言的 RPC 标准库的 RPC
默认采用 Go 语言特有的 gob 编码,因此从其它语言调用 Go 语言实现的 RPC 服务将比较困难。在互联网的微服务时代,每个 RPC 以及服务的使用者都可能采用不同的编程语言,因此跨语言是互联网时代 RPC 的一个首要条件。得益于 RPC 的框架设计,Go 语言的 RPC 其实也是很容易实现跨语言支持的。
Go 语言的 RPC 框架有两个比较有特色的设计:一个是 RPC 数据打包时可以通过插件实现自定义的编码和解码;另一个是 RPC 建立在抽象的 io.ReadWriteCloser
接口之上的,我们可以将 RPC 架设在不同的通讯协议之上。这里我们将尝试通过官方自带的 net/rpc/jsonrpc
扩展实现一个跨语言的 RPC。
首先是基于 json 编码重新实现 RPC 服务:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 func main () { rpc.RegisterName("HelloService" , new (HelloService)) listener, err := net.Listen("tcp" , ":1234" ) if err != nil { log.Fatal("ListenTCP error:" , err) } for { conn, err := listener.Accept() if err != nil { log.Fatal("Accept error:" , err) } go rpc.ServeCodec(jsonrpc.NewServerCodec(conn)) } }
代码中最大的变化是用 rpc.ServeCodec
函数替代了 rpc.ServeConn
函数,传入的参数是针对服务端的 json
编解码器。
我们可以看一下 rpc.ServeCodec
方法:
入参是 ServerCodec
接口:
所以只要实现了 ServerCodec
接口就行,而内置的 jsonrpc
就已经实现了该接口。
然后是实现 json 版本的客户端:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 func main () { conn, err := net.Dial("tcp" , "localhost:1234" ) if err != nil { log.Fatal("net.Dial:" , err) } client := rpc.NewClientWithCodec(jsonrpc.NewClientCodec(conn)) var reply string err = client.Call("HelloService.Hello" , "hello" , &reply) if err != nil { log.Fatal(err) } fmt.Println(reply) }
先手工调用 net.Dial
函数建立 TCP
连接,然后基于该连接建立针对客户端的 json
编解码器。
在确保客户端可以正常调用 RPC
服务的方法之后,我们用一个普通的 TCP
服务代替 Go
语言版本的 RPC
服务,这样可以查看客户端调用时发送的数据格式。比如通过 nc
命令 nc -l 1234
在同样的端口启动一个 TCP
服务。然后再次执行一次 RPC
调用将会发现 nc
输出了以下的信息:
通过上图就可以知道 client
端 jsonrpc
发送的数据格式是什么样的。
这是一个 json 编码的数据,其中 method 部分对应要调用的 rpc 服务和方法组合成的名字,params 部分的第一个元素为参数,id 是由调用端维护的一个唯一的调用编号。
请求的 json
数据对象在内部对应两个结构体:客户端是 clientRequest,服务端是 serverRequest。clientRequest 和 serverRequest 结构体的内容基本是一致的:
可以查看 src/net/rpc/jsonrpc
包的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 type clientRequest struct { Method string `json:"method"` Params [1 ]any `json:"params"` Id uint64 `json:"id"` }type clientResponse struct { Id uint64 `json:"id"` Result *json.RawMessage `json:"result"` Error any `json:"error"` }type serverResponse struct { Id *json.RawMessage `json:"id"` Result any `json:"result"` Error any `json:"error"` }type serverRequest struct { Method string `json:"method"` Params *json.RawMessage `json:"params"` Id *json.RawMessage `json:"id"` }
在获取到 RPC 调用对应的 json 数据后,我们可以通过直接向架设了 RPC 服务的 TCP 服务器发送 json 数据模拟 RPC 方法调用:
1 echo -e '{"method":"HelloService.Hello","params":["ezreal"],"id":1}' | nc localhost 1234
返回的结果也是一个 json 格式的数据:
1 { "id" : 1 , "result" : "hello:hello" , "error" : null }
因此无论采用何种语言,只要遵循同样的 json 结构,以同样的流程就可以和 Go 语言编写的 RPC 服务进行通信。这样我们就实现了跨语言的 RPC。
Http 上的 RPC Go 语言内在的 RPC 框架已经支持在 Http 协议上提供 RPC 服务。但是框架的 http 服务同样采用了内置的 gob 协议,并且没有提供采用其它协议的接口,因此从其它语言依然无法访问的。在前面的例子中,我们已经实现了在 TCP 协议之上运行 jsonrpc 服务,并且通过 nc 命令行工具成功实现了 RPC 方法调用。现在我们尝试在 http 协议上提供 jsonrpc 服务。
这部分看代码基本就能看出来:
所以不在赘述。