rpcx支持websocket协议了!

当前, rpcx支持tcp、kcp、quic、unix domain、http、jsonrpc 2.0等传输协议,并没有考虑websocket的支持,原因在于考虑到微服务更多的是企业内部服务之间的通讯,不太可能暴露给浏览器,企业内部大多采用tcp的方式传输,或者udp族(kcp、quic)方式的传输,但是还是有用户提出希望能支持websocket。

我想websocket可能还是会有一些场景下使用,比如游戏开发,移动端核服务端的通讯等等,所以经过慎重考虑,我决定在 rpcx 1.6.2版本中增加对websocket的支持。

基于rpcx良好的架构设计,实现对websocket的支持并不难。

本来,rpcx对kcp、quic的支持也不复杂,原因在于rpcx的通讯采用的net.Conn的这样一个通用的接口,listent.Accept方法接受客户端的请求后返回一个net.Conn对象,后续请求的解析、处理,写回都是在这个通用接口上操作的,所以非常容易支持新的传输协议。

以前rpcx还支持其它的udp的扩展协议,比如

因为这两个协议用的人很少,所以在rpcx 1.6中把这两个协议的支持去掉了。

所以,对于websocket来说,找到一个可信赖的、合适的websocket库,可以很容易的实现基于websocket传输的微服务框架。

目前Go生态圈常用的websocket库如下:

虽然像gobwas/ws性能优良、gorilla/websocket功能齐全,但是考虑到API的便利性,我还是决定使用Go官方的x/net/websocket库,一来有官方背书,值得信赖,二来rpcx使用websocket功能比较简单,就是期望能获得一个net.Conn对象,x/net/websocket库正好能满足需求。

服务端

如果服务端采用websocket协议,那么network应该写成ws或者wss,而不是tcp或者quic

服务端会启动一个http server处理websocket请求。 默认的websocket path为 share.DefaultRPCPath,只要服务端核客户端保持一致即可。

ServeWS会传入一个*websocket.Conn参数,它实现了net.Conn接口,所以后续的处理使用通用的s.serveConn(conn)即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func (s *Server) Serve(network, address string) (err error) {
......
if network == "ws" || network == "wss" {
s.serveByWS(ln, "")
return nil
}
......
}
func (s *Server) serveByWS(ln net.Listener, rpcPath string) {
s.ln = ln
if rpcPath == "" {
rpcPath = share.DefaultRPCPath
}
mux := http.NewServeMux()
mux.Handle(rpcPath, websocket.Handler(s.ServeWS))
srv := &http.Server{Handler: mux}
srv.Serve(ln)
}
func (s *Server) ServeWS(conn *websocket.Conn) {
s.mu.Lock()
s.activeConn[conn] = struct{}{}
s.mu.Unlock()
s.serveConn(conn)
}

服务端的改造就如此简单,接下来看看如何部署一个服务端的websocket服务:

server.go
1
2
3
4
5
6
7
8
9
10
func main() {
flag.Parse()
s := server.NewServer()
s.RegisterName("Arith", new(Arith), "")
err := s.Serve("ws", *addr)
if err != nil {
panic(err)
}
}

客户端

客户端最重要的是要实现一个websocket协议的建连,如果实现了这个方法,客户端的改造基本就完成了。这个方法的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
func newDirectWSConn(c *Client, network, address string) (net.Conn, error) {
if c == nil {
return nil, errors.New("empty client")
}
path := c.option.RPCPath
if path == "" {
path = share.DefaultRPCPath
}
var conn net.Conn
var err error
// url := "ws://localhost:12345/ws"
var url, origin string
if network == "ws" {
url = fmt.Sprintf("ws://%s%s", address, path)
origin = fmt.Sprintf("http://%s", address)
} else {
url = fmt.Sprintf("wss://%s%s", address, path)
origin = fmt.Sprintf("https://%s", address)
}
if c.option.TLSConfig != nil {
config, err := websocket.NewConfig(url, origin)
if err != nil {
return nil, err
}
config.TlsConfig = c.option.TLSConfig
conn, err = websocket.DialConfig(config)
} else {
conn, err = websocket.Dial(url, "", origin)
}
return conn, err
}

在建立连接的时候为wswss协议调用这个方法即可:

1
2
3
4
5
6
7
8
9
10
11
12
func (c *Client) Connect(network, address string) error {
var conn net.Conn
var err error
switch network {
case "http":
conn, err = newDirectHTTPConn(c, network, address)
case "ws", "wss":
conn, err = newDirectWSConn(c, network, address)
case "kcp":
conn, err = newDirectKCPConn(c, network, address)
.......

客户端改造完成后,我们写一个客户端调用改车把刚才启动的websocket服务:

client.fo
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func main() {
flag.Parse()
d, _ := client.NewPeer2PeerDiscovery("ws@"+*addr, "")
opt := client.DefaultOption
opt.SerializeType = protocol.JSON
xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, opt)
defer xclient.Close()
args := example.Args{
A: 10,
B: 20,
}
reply := &example.Reply{}
err := xclient.Call(context.Background(), "Mul", args, reply)
if err != nil {
log.Fatalf("failed to call: %v", err)
}
log.Printf("%d * %d = %d", args.A, args.B, reply.C)
}

所以你看到,对于一个新的传输协议来说,如果它的连接能够遵循net.Conn接口,rpcx支持它还是很容易的。

完整的使用websocket传输协议的例子: websocket example