当前, rpcx支持tcp、kcp、quic、unix domain、http、jsonrpc 2.0等传输协议,并没有考虑websocket的支持,原因在于考虑到微服务更多的是企业内部服务之间的通讯,不太可能暴露给浏览器,企业内部大多采用tcp的方式传输,或者udp族(kcp、quic)方式的传输,但是还是有用户提出希望能支持websocket。
我想websocket可能还是会有一些场景下使用,比如游戏开发,移动端核服务端的通讯等等,所以经过慎重考虑,我决定在 rpcx 1.6.2版本中增加对websocket的支持。
基于rpcx良好的架构设计,实现对websocket的支持并不难。
本来,rpcx对kcp、quic的支持也不复杂,原因在于rpcx的通讯采用的net.Conn
的这样一个通用的接口,listent.Accept
方法接受客户端的请求后返回一个net.Conn
对象,后续请求的解析、处理,写回都是在这个通用接口上操作的,所以非常容易支持新的传输协议。
以前rpcx还支持其它的udp的扩展协议,比如
因为这两个协议用的人很少,所以在rpcx 1.6中把这两个协议的支持去掉了。
所以,对于websocket来说,找到一个可信赖的、合适的websocket库,可以很容易的实现基于websocket传输的微服务框架。
目前Go生态圈常用的websocket库如下:
虽然像gobwas/ws
性能优良、gorilla/websocket
功能齐全,但是考虑到API的便利性,我还是决定使用Go官方的x/net/websocket
库,一来有官方背书,值得信赖,二来rpcx使用websocket功能比较简单,就是期望能获得一个net.Conn
对象,x/net/websocket
库正好能满足需求。
服务端
如果服务端采用websocket协议,那么network应该写成ws
或者wss
,而不是tcp
或者quic
。
服务端会启动一个http server处理websocket请求。 默认的websocket path为 share.DefaultRPCPath
,只要服务端核客户端保持一致即可。
ServeWS
会传入一个*websocket.Conn
参数,它实现了net.Conn
接口,所以后续的处理使用通用的s.serveConn(conn)
即可。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| func (s *Server) Serve(network, address string) (err error) { ...... if network == "ws" || network == "wss" { s.serveByWS(ln, "") return nil } ...... } func (s *Server) serveByWS(ln net.Listener, rpcPath string) { s.ln = ln if rpcPath == "" { rpcPath = share.DefaultRPCPath } mux := http.NewServeMux() mux.Handle(rpcPath, websocket.Handler(s.ServeWS)) srv := &http.Server{Handler: mux} srv.Serve(ln) } func (s *Server) ServeWS(conn *websocket.Conn) { s.mu.Lock() s.activeConn[conn] = struct{}{} s.mu.Unlock() s.serveConn(conn) }
|
服务端的改造就如此简单,接下来看看如何部署一个服务端的websocket服务:
server.go1 2 3 4 5 6 7 8 9 10
| func main() { flag.Parse() s := server.NewServer() s.RegisterName("Arith", new(Arith), "") err := s.Serve("ws", *addr) if err != nil { panic(err) } }
|
客户端
客户端最重要的是要实现一个websocket协议的建连,如果实现了这个方法,客户端的改造基本就完成了。这个方法的实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| func newDirectWSConn(c *Client, network, address string) (net.Conn, error) { if c == nil { return nil, errors.New("empty client") } path := c.option.RPCPath if path == "" { path = share.DefaultRPCPath } var conn net.Conn var err error var url, origin string if network == "ws" { url = fmt.Sprintf("ws://%s%s", address, path) origin = fmt.Sprintf("http://%s", address) } else { url = fmt.Sprintf("wss://%s%s", address, path) origin = fmt.Sprintf("https://%s", address) } if c.option.TLSConfig != nil { config, err := websocket.NewConfig(url, origin) if err != nil { return nil, err } config.TlsConfig = c.option.TLSConfig conn, err = websocket.DialConfig(config) } else { conn, err = websocket.Dial(url, "", origin) } return conn, err }
|
在建立连接的时候为ws
和wss
协议调用这个方法即可:
1 2 3 4 5 6 7 8 9 10 11 12
| func (c *Client) Connect(network, address string) error { var conn net.Conn var err error switch network { case "http": conn, err = newDirectHTTPConn(c, network, address) case "ws", "wss": conn, err = newDirectWSConn(c, network, address) case "kcp": conn, err = newDirectKCPConn(c, network, address) .......
|
客户端改造完成后,我们写一个客户端调用改车把刚才启动的websocket服务:
client.fo1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| func main() { flag.Parse() d, _ := client.NewPeer2PeerDiscovery("ws@"+*addr, "") opt := client.DefaultOption opt.SerializeType = protocol.JSON xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, opt) defer xclient.Close() args := example.Args{ A: 10, B: 20, } reply := &example.Reply{} err := xclient.Call(context.Background(), "Mul", args, reply) if err != nil { log.Fatalf("failed to call: %v", err) } log.Printf("%d * %d = %d", args.A, args.B, reply.C) }
|
所以你看到,对于一个新的传输协议来说,如果它的连接能够遵循net.Conn
接口,rpcx支持它还是很容易的。
完整的使用websocket传输协议的例子: websocket example。