最近对rpcx做的一些优化以及一些优化尝试

最近在做2022 Go生态圈 rpc 框架 Benchmark之前,专门花了一星期时间,对rpcx进行了重点的优化,这篇文章专门记录一下几个重要的优化点,供大家参考。

增加handler方式,避免服务端使用reflect

在之前的rpcx实现中,参考的是标准库rpc的注册方式,一个服务的注册如下:

1
rpcxserver.RegisterName("Hello", new(Hello), "")

它实际是通过反射的方式遍历这个rcvr,找到它的服务方法和参数类型,并缓存下来:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func (s *Server) register(rcvr interface{}, name string) (string, error) {
s.serviceMapMu.Lock()
defer s.serviceMapMu.Unlock()
service := new(service)
service.typ = reflect.TypeOf(rcvr)
service.rcvr = reflect.ValueOf(rcvr)
sname := reflect.Indirect(service.rcvr).Type().Name() // Type
service.name = sname
// Install the methods
service.method = suitableMethods(service.typ, true)
if len(service.method) == 0 {
......
}
s.serviceMap[service.name] = service
return sname, nil
}

然后在处理请求的时候,根据调用的服务和方法,找到相应的类型,对请求类型和返回类型利用reflect产生相应的值,再利用reflect的function.Call执行方法调用,虽然中间使用了池化的技术,但是内部还是大量使用的反射的功能,性能损失不少。

Go标准库http router的实现给我了灵感,而lesismal/arpc的应用和性能优异的表现,促使我决定在rpcx增加一个更有效的服务处理方式,类型配置http handler一样的方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func hello(ctx *server.Context) error {
msg := &proto.BenchmarkMessage{}
err := ctx.Bind(msg)
if err != nil {
return err
}
msg.Field1 = "OK"
msg.Field2 = 100
if *delay > 0 {
time.Sleep(*delay)
} else {
runtime.Gosched()
}
return ctx.Write(msg)
}
var (
host = flag.String("s", "127.0.0.1:8972", "listened ip and port")
delay = flag.Duration("delay", 0, "delay to mock business processing by sleep")
)
func main() {
flag.Parse()
rpcxserver := server.NewServer()
rpcxserver.AddHandler("Hello", "Say", hello)
rpcxserver.Serve("tcp", *host)
}

服务方法签名如func Xxx(ctx *server.Context) error,它会传入一个特制的Context,通过这个Context,能获得请求参数,也可以写回response。

通过ctx.Bind解码出请求参数:

1
2
3
4
5
6
msg := &proto.BenchmarkMessage{}
err := ctx.Bind(msg)
if err != nil {
return err
}

通过ctx.Write(msg)把response写回。

这里不会用到反射。如果你的编解码器使用的不是反射的方式的话,会更高效。

使用goroutine pool

rpcx默认采用每个request至少一个goroutine处理方式。这样如果在高并发的情况下,服务端会有非常巨大的goroutine存在,虽然说Go支持上万的goroutine,但是在goroutine非常多的情况下,(内存)资源占用非常多,对Go的调度和垃圾回收也会有一定的影响,所以在高并发的场景下,采用goroutine 池在一定程度上会提升性能。

说起线程池,网上已经有不下十个goroutine pool (或者叫做worker pool)的实现。rpcx使用的是alitto/pond, 并没有太多的性能的考虑,而是从使用方便性方面去考虑。

如果想使用goroutine池,你可以使用server.WithPool(100, 1000000):

1
rpcxserver := server.NewServer(server.WithPool(100, 1000000))

其中第一个参数是goroutine(worker)的数量,第二个参数capacity是最大的待处理的请求。超过capacity,新的请求就会被阻塞。

解析后的请求会被提交的goroutine pool中。

1
2
3
4
5
6
7
if s.pool != nil {
s.pool.Submit(func() {
s.processOneRequest(ctx, req, conn, writeCh)
})
} else {
go s.processOneRequest(ctx, req, conn, writeCh)
}

调整进程优先级

有时候,你的程序可能会和其它程序混跑,即使没有其它的业务程序,也可能有系统自带的一些应用,如果我们把rpcx程序的优先级设置的高一些,使它有更多的可能被Linux系统程序调度,也会带来更多的优先级。

Linux每个进程都有一个介于 -20 到 19 之间的 nice 值, 值越低会有更多的机会获得Linux的调度。默认情况下,进程的 nice 值为 0。

进程的 nice 值,可以通过 nice 命令和 renice 命令修改,进而调整进程的运行顺序。

nice [-n NI值] 程序 按照指定的优先级启动程序。

renice 命令可以在进程运行时修改其 NI 值,从而调整优先级。renice [优先级] PID

当然你也可以通过程序动态的设置进程的优先级:syscall.Setpriority(syscall.PRIO_PROCESS, 0, -20),这条命令设置本进程的优先级为-20

这个带来10% ~ 20%性能的提升。

我还想尝试只设置listener.Accept的goroutine,让有更多的机会获得处理的机会。不过没有可调用的系统调用去设置。尝试了runtime.LockOSThread()listener.Accept锁定一个线程去去处理,没啥鸟用。

进一步优化想法

当然,还有一些优化的方法,不过还没有验证。

比如redis,采用单线程(主逻辑)的方式处理请求、还有比如nginx,采用多个worker独立的去处理连接。这样可以减少并发锁的请求。
rpcx可以改成单worker的方式,然后启动多个worker去监听和处理请求,可能也会提升性能。不过这个没有进一步的测试。