使用BPF, 将Go网络程序的吞吐提升8倍

经典的bpf(classical Berkeley Packet Filter) 是非常好用的一个技术,在一些特殊的Go底层网络编程的场合,可以很好的提高性能。

背景

先前我开发过一个Go UDP应用程序, 客户端和服务端通过UDP程序,通过raw socket进行通讯。程序的目的比较特殊,这里我以一个简单的程序为例介绍。

事实上,我说我使用rawsocket方式并不严谨,我并不是采用下面的方式实现socket并进行通讯的(链路层的方式):

1
2
3
4
5
6
7
fd, err:= syscall.Socket(syscall.AF_PACKET, syscall.SOCK_RAW,syscall.ETH_P_ALL)
if (err != nil) {
fmt.Println("Error: " + err.Error())
return;
}
fmt.Println("Obtained fd ", fd)
defer syscall.Close(fd)

也不是采用下面的rawsocket方式(IP层的方式):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func main() {
var err error
fd, e := syscall.Socket(syscall.AF_INET, syscall.SOCK_RAW, syscall.IPPROTO_UDP)
if e != nil {
fmt.Println("Problem @ location 1")
}
addr := syscall.SockaddrInet4{
Port: 27289,
Addr: [4]byte{127, 0, 0, 1},
}
p := pkt()
err = syscall.Sendto(fd, p, 0, &addr)
if err != nil {
log.Fatal("Sendto:", err)
}
}

而是直接采用Go标准库中封装好的方法net.ListenPacket("ip4:udp", addr)在IP层进行包的发送和接收。

我通过封装自定义的UDP层的数据结构,实现自定义的包的发送和接收,进行网络的监控。

也许有人说使用标准库的UDPConn不就行了。如果是普通的UDP程序,的确没问题,如果对于一些特殊的需求,比如监听1000个UDP端口,有上万个节点定时的发送监控的数据,我们不太可能建立1000*1万个UDPConn,所以这里我采用rawsocket通讯的方式。

RawSocket是是标准Berkeley socket的一部分,我们使用Go的标准库开发网络程序时,大部分场景都是使用封装好的数据报类型(UDPConn)或者流类型(TCPConn),但是如果想做更底层的一些网络编程的话,就需要使用到RawSocket了,比如更底层的TCP、UDP网络控制、ICMP、ARP等协议。不同的操作系统可能实现的RawSocket也不同,这里我们以Linux环境为例。

Linux man手册对RawSocket相关知识做了详细的介绍:socket(2)packet(7) raw(7),本文不再做转述,这也不是本文的重点。

依照Linux文档,Linux 服务器中收到的包既=会传给内核网络模块,也会传给RawSocket。所以你使用RawSocket的时候有时候需要注意,比如你在处理TCP的包时,可能Linux内核的网络程序已经把这个包处理了。

Raw sockets may tap all IP protocols in Linux, even protocols like ICMP or TCP which have a protocol module in the kernel. In this case, the packets are passed to both the kernel module and the raw socket(s). This should not be relied upon in portable programs, many other BSD socket implementation have limitations here.

如果没有特殊的需求,我们直接就使用net.ListenPacket是实现一个RawSocket的程序。这个方法的签名如下:

1
func ListenPacket(network, address string) (PacketConn, error)

其中第一个参数network可以是udpudp4udp6unixgram,也是可以ipip4ip6加冒号再加一个协议号或者协议名,比如ip:1ip:icmp,就可以你也处理什么协议。

演示程序

服务端程序

服务端程序我们使用conn, err := net.ListenPacket("ip4:udp", *addr)监听本地地址上所有的UDP包,并启动一个goroutine去处理。处理程序中应该还有一个判断,就是检查UDP的端口是不是我们处理的端口,因为这里net.ListenPacket监听的是本地所有的UDP,可能有很多无用的UDP包都传入到用户态的程序中了。

这里我们使用gopacket对各种协议层的包的定义,方便解析(或创建)TCP/IP各层的网络协议。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
package main
import (
"flag"
"log"
"net"
"github.com/google/gopacket"
"github.com/google/gopacket/layers"
"github.com/smallnest/go-network-programming/codec"
"golang.org/x/net/bpf"
"golang.org/x/net/ipv4"
)
var (
addr = flag.String("s", "localhost", "server address")
port = flag.Int("p", 8972, "port")
)
var (
stat = make(map[string]int)
lastStatTime = int64(0)
)
func main() {
flag.Parse()
conn, err := net.ListenPacket("ip4:udp", *addr)
if err != nil {
panic(err)
}
cc := conn.(*net.IPConn)
cc.SetReadBuffer(20 * 1024 * 1024)
cc.SetWriteBuffer(20 * 1024 * 1024)
handleConn(conn)
}
func handleConn(conn net.PacketConn) {
for {
buffer := make([]byte, 1024)
n, remoteaddr, err := conn.ReadFrom(buffer)
if err != nil {
log.Fatal(err)
}
buffer = buffer[:n]
packet := gopacket.NewPacket(buffer, layers.LayerTypeUDP, gopacket.NoCopy)
// Get the UDP layer from this packet
if udpLayer := packet.Layer(layers.LayerTypeUDP); udpLayer != nil {
udp, _ := udpLayer.(*layers.UDP)
if app := packet.ApplicationLayer(); app != nil {
data, err := codec.EncodeUDPPacket(net.ParseIP("127.0.0.1"), net.ParseIP("127.0.0.1"), uint16(udp.DstPort), uint16(udp.SrcPort), app.Payload())
if err != nil {
log.Printf("failed to EncodePacket: %v", err)
return
}
if _, err := conn.WriteTo(data, remoteaddr); err != nil {
log.Printf("failed to write packet: %v", err)
conn.Close()
return
}
}
}
}
}

客户端程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package main
import (
"fmt"
"log"
"net"
"github.com/google/gopacket"
"github.com/google/gopacket/layers"
"github.com/smallnest/go-network-programming/codec"
)
func main() {
conn, err := net.ListenPacket("ip4:udp", "127.0.0.1")
if err != nil {
panic(err)
}
data, err := codec.EncodeUDPPacket(net.ParseIP("127.0.0.1"), net.ParseIP("127.0.0.1"), 8972, 0, []byte("hello"))
if err != nil {
log.Printf("failed to EncodePacket: %v", err)
return
}
remoteAddr := &net.IPAddr{IP: net.ParseIP("127.0.0.1")}
if _, err := conn.WriteTo(data, remoteAddr); err != nil {
log.Printf("failed to write packet: %v", err)
conn.Close()
return
}
buffer := make([]byte, 1024)
n, _, err := conn.ReadFrom(buffer)
if err != nil {
log.Fatal(err)
}
buffer = buffer[:n]
packet := gopacket.NewPacket(buffer, layers.LayerTypeUDP, gopacket.NoCopy)
// Get the UDP layer from this packet
if udpLayer := packet.Layer(layers.LayerTypeUDP); udpLayer != nil {
if app := packet.ApplicationLayer(); app != nil {
fmt.Printf("reply: %s\n", app.Payload())
}
}
}

客户端程序这里做了简化,写入一个hello,并读取服务端的返回。我们在做性能测试的时候,会使用循环不断的写入一个seq号,并检查服务端是否返回这个seq,以便计算丢包性能。并且还使用一个限流器进行限流,测试在一定的RPS情况下的丢包率。

辅助方法

下面是EncodeUDPPacket方法,用来产生一个UDP的包数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package codec
import (
"net"
"github.com/google/gopacket"
"github.com/google/gopacket/layers"
)
func EncodeUDPPacket(localIP, remoteIP net.IP, localPort, remotePort uint16, payload []byte) ([]byte, error) {
ip := &layers.IPv4{
Version: 4,
TTL: 128,
SrcIP: localIP,
DstIP: remoteIP,
Protocol: layers.IPProtocolUDP,
}
udp := &layers.UDP{
SrcPort: layers.UDPPort(localPort),
DstPort: layers.UDPPort(remotePort),
}
udp.SetNetworkLayerForChecksum(ip)
buf := gopacket.NewSerializeBuffer()
opts := gopacket.SerializeOptions{
ComputeChecksums: true,
FixLengths: true,
}
err := gopacket.SerializeLayers(buf, opts, udp, gopacket.Payload(payload))
return buf.Bytes(), err
}

性能问题

虽然上面的程序运行的很好,但是在并发量比较大的情况,会有一些问题。

上面我们启动了一个goroutine去读取这个包,这里是一个性能瓶颈,最终服务端只能使用一个核去处理RawSocket的包。

即使创建多个goroutine去读取这个PacketConn,也是没有用的,因为这个PacketConn是唯一的,它是一个瓶颈,多个goroutine有时候还不如一个goroutine去读取更好。

那么能不能调用多次net.ListenPacket("ip4:udp", *addr),生成多个RawSocket并发的去处理呢?

貌似看起来可以,但是实际上,这多个RawSocket都会读取到相同的UDPPacket,而不是负载均衡平摊到多个Socket上。所以多个RawSocket不但没用,反而更加耗费服务器的资源了。

实际测试也就能达到2~3万的吞吐,并发量再高就出现丢包的情况了。

但是没有办法了么?

也不是。这里我们可以看到,主要性能瓶颈是我们上面的程序没有办法做到负载均衡,利用多核的能力并发的去处理。第二个性能瓶颈就是程序监听了本机所有的UDP的packet,交给用户态程序筛选去处理,这里面有很多我们不需要的包。

这两个性能问题我们都可以通过BPF进行处理。

BPF进行包过滤

经典的BPF早在1994就出现了,虽然大家现在都在谈论扩展的BPF(eBPF),但是经典的BPF依然可以发挥它的威力。

你可能没有在编程中应用过BPF,但是我相信你在实际工作中一定和它发生过什么。

比如你使用tcpdump在监听网络的传输情况时,经常会加上过滤手段,比如下面的命令是只监听tcp协议的8080端口:

1
tcpdump -nn -vvvv -i any "tcp port 8080"

tcpdump其实就是把tcp port 8080生成过滤器,在内核中对包进行过滤,只把过滤后的包打筛选出来。

其实你可以通过下面的命令查看编译的过滤的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
[root@lab ~]# tcpdump -d "tcp port 8080"
(000) ldh [12]
(001) jeq #0x86dd jt 2 jf 8
(002) ldb [20]
(003) jeq #0x6 jt 4 jf 19
(004) ldh [54]
(005) jeq #0x1f90 jt 18 jf 6
(006) ldh [56]
(007) jeq #0x1f90 jt 18 jf 19
(008) jeq #0x800 jt 9 jf 19
(009) ldb [23]
(010) jeq #0x6 jt 11 jf 19
(011) ldh [20]
(012) jset #0x1fff jt 19 jf 13
(013) ldxb 4*([14]&0xf)
(014) ldh [x + 14]
(015) jeq #0x1f90 jt 18 jf 16
(016) ldh [x + 16]
(017) jeq #0x1f90 jt 18 jf 19
(018) ret #262144
(019) ret #0

这代表什么意思的?BPF定义了有限一些指令,可以在VM中,对包进行过滤.
第一行是加载包的偏移量(offset 12个字节),第二行是检查是否是IPV6,如果是跳到002,如果不是跳到008。我们关注一下IPV4。
008那一行是判断是否是ipv4,如果是跳到009009加载偏移量23处的一个字节,它是ip proto,010行判断ip proto是否是TCP,如果是跳到011
接下来判断flags,以便确定数据的地址。
014行和016行是读取TCP协议中的源端口和目的端口,如果等于8080(0x1f90),则最大返回包大小262144个字节,否则就丢弃这个包。

当然tcpdump的生成的代码相当的严谨了我我们实际写的时候,如果确定是ipv4的包,包也没什么扩展的话,写出的代码要比这个简单。但是我们实际在应用BPF的时候不妨采用tcpdump生成的代码,不会出错。

使用-dd可以显示成c代码片段,使用-ddd显示为十进制数。我们看一下-dd的效果,因为这个结果我们可以用来转换成Go的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
[root@lab ~]# tcpdump -dd "tcp port 8080"
{ 0x28, 0, 0, 0x0000000c },
{ 0x15, 0, 6, 0x000086dd },
{ 0x30, 0, 0, 0x00000014 },
{ 0x15, 0, 15, 0x00000006 },
{ 0x28, 0, 0, 0x00000036 },
{ 0x15, 12, 0, 0x00001f90 },
{ 0x28, 0, 0, 0x00000038 },
{ 0x15, 10, 11, 0x00001f90 },
{ 0x15, 0, 10, 0x00000800 },
{ 0x30, 0, 0, 0x00000017 },
{ 0x15, 0, 8, 0x00000006 },
{ 0x28, 0, 0, 0x00000014 },
{ 0x45, 6, 0, 0x00001fff },
{ 0xb1, 0, 0, 0x0000000e },
{ 0x48, 0, 0, 0x0000000e },
{ 0x15, 2, 0, 0x00001f90 },
{ 0x48, 0, 0, 0x00000010 },
{ 0x15, 0, 1, 0x00001f90 },
{ 0x6, 0, 0, 0x00040000 },
{ 0x6, 0, 0, 0x00000000 },

实际上,x/net/bpf提供了相应的方法,可以更容易的编写BPF程序,序列化和反序列化。比如编写一个过滤出目的端口等于8972的所有的包,我们可以简单写成如下的格式(考虑到简单形式,我们只考虑了IPV4和普通IP包的形式):

1
2
3
4
5
6
7
8
9
type Filter []bpf.Instruction
var filter = Filter{
bpf.LoadAbsolute{Off: 22, Size: 2}, //加载目的端口到寄存器
bpf.JumpIf{Cond: bpf.JumpEqual, Val: 8972, SkipFalse: 1}, // 如果值等于8972的话,执行下一行,否则跳过下一行
bpf.RetConstant{Val: 0xffff}, // 返回这个包的最多0xffff的字节的数据
bpf.RetConstant{Val: 0x0}, // 返回零个字节,也就是忽略这个包
}

我们可以写一个程序,把tcpdump生成的代码转换成bpf的RawInstruction指令:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
func parse(data string) (raws []bpf.RawInstruction) {
lines := strings.Split(data, "\n")
for _, line := range lines {
line = strings.TrimSpace(line)
if line == "" {
continue
}
line = strings.TrimPrefix(line, "{")
line = strings.TrimSuffix(line, " },")
items := strings.Split(line, ",")
// assert len(items) == 4
raw := bpf.RawInstruction{
Op: uint16(numToInteger(items[0])),
Jt: uint8(numToInteger(items[1])),
Jf: uint8(numToInteger(items[2])),
K: uint32(numToInteger(items[3])),
}
raws = append(raws, raw)
}
return raws
}
func numToInteger(s string) int {
s = strings.TrimSpace(s)
if strings.HasPrefix(s, "0x") {
s := strings.Replace(s, "0x", "", -1)
result, _ := strconv.ParseInt(s, 16, 64)
return int(result)
}
result, _ := strconv.ParseInt(s, 10, 64)
return int(result)
}

好了所有这一切都准备好了,背景知识介绍了,当前的RawSocket程序的性能瓶颈也介绍了,那么如果解决性能瓶颈呢。

第一个性能瓶颈我们可以生成多个goroutine,每个goroutine负责过滤一部分的包,这样就实现了负载均衡。比如根据客户端的IP进行过滤,或者服务端监听1000个端口,每个goroutine只负责一部分的端口。而可以根据客户端的源端口进行过滤等等。总是,通过BPF过滤,一个goroutine只负责一部分的packet,实现了多核的处理。

第二个瓶颈随着第一个问题也迎刃而解。因为BPF只过滤我们的特定的端口,其它端口的UDP包不会从内核态复制到用户态,减少了无用包的处理。

要为标准库的PacketConn设置BPF过滤,也有多种办法,比如调用syscall.SetsockoptInt进行设置。但是golang.org/x/net/ipv4提供了SetBPF方法,我们可以直接将标准库的PacketConn转换成ipv4.PacketConn,再进行设置。

比如上面的server程序,我们可以修改为使用BPF在内核态过滤:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
package main
import (
"flag"
"log"
"net"
"github.com/google/gopacket"
"github.com/google/gopacket/layers"
"github.com/smallnest/go-network-programming/codec"
"golang.org/x/net/bpf"
"golang.org/x/net/ipv4"
)
var (
addr = flag.String("s", "localhost", "server address")
port = flag.Int("p", 8972, "port")
)
var (
stat = make(map[string]int)
lastStatTime = int64(0)
)
func main() {
flag.Parse()
conn, err := net.ListenPacket("ip4:udp", *addr)
if err != nil {
panic(err)
}
cc := conn.(*net.IPConn)
cc.SetReadBuffer(20 * 1024 * 1024)
cc.SetWriteBuffer(20 * 1024 * 1024)
pconn := ipv4.NewPacketConn(conn)
var assembled []bpf.RawInstruction
if assembled, err = bpf.Assemble(filter); err != nil {
log.Print(err)
return
}
pconn.SetBPF(assembled)
handleConn(conn)
}
func handleConn(conn net.PacketConn) {
for {
buffer := make([]byte, 1024)
n, remoteaddr, err := conn.ReadFrom(buffer)
if err != nil {
log.Fatal(err)
}
buffer = buffer[:n]
packet := gopacket.NewPacket(buffer, layers.LayerTypeUDP, gopacket.NoCopy)
// Get the UDP layer from this packet
if udpLayer := packet.Layer(layers.LayerTypeUDP); udpLayer != nil {
udp, _ := udpLayer.(*layers.UDP)
if app := packet.ApplicationLayer(); app != nil {
data, err := codec.EncodeUDPPacket(net.ParseIP("127.0.0.1"), net.ParseIP("127.0.0.1"), uint16(udp.DstPort), uint16(udp.SrcPort), app.Payload())
if err != nil {
log.Printf("failed to EncodePacket: %v", err)
return
}
if _, err := conn.WriteTo(data, remoteaddr); err != nil {
log.Printf("failed to write packet: %v", err)
conn.Close()
return
}
}
}
}
}
type Filter []bpf.Instruction
var filter = Filter{
bpf.LoadAbsolute{Off: 22, Size: 2}, // load the destination port
bpf.JumpIf{Cond: bpf.JumpEqual, Val: uint32(*port), SkipFalse: 1}, // if Val != 8972 skip next instruction
bpf.RetConstant{Val: 0xffff}, // return 0xffff bytes (or less) from packet
bpf.RetConstant{Val: 0x0}, // return 0 bytes, effectively ignore this packet
}