高性能批量读写网络包 补遗

前一段时间写了一篇高性能批量读写网络包, 里面介绍了sendmmsg系统调用,可以批量发送网络包,有读者询问这和writev有什么区别。

其实看它们的定义,就知道区别在哪里了:

1
2
3
int sendmmsg(int sockfd, struct mmsghdr *msgvec, unsigned int vlen, int flags);
ssize_t writev(int fd, const struct iovec *iov, int iovcnt);

sendmmsg()系统调用是sendmsg(2)的扩展,它允许调用方使用单个系统调用在套接字上传输多个消息。 (这对于某些应用程序具有性能优势。)
sockfd参数是要在其上传输数据的套接字的文件描述符。
msgvec参数是指向mmsghdr结构数组的指针。该数组的大小在vlen中指定。
sendmmsg主要应用于网络批量写,如果想对文件IO批量操作,可以使用writevreadv

writev()系统调用写 iovcnt缓存数据 到文件描述符fd中 ("gather output")。它处理的数据是"原子性的"的,不会被其他并发的读写操作所干扰。这意味着在多线程或多进程环境中使用这些函数时,可以安全地进行数据传输,而无需担心数据的分片或混乱。
在常规情况下,writev() 函数不会只写入部分数据。它要么将所有的数据作为一个单一的操作写入,要么返回一个错误,指示写入失败。
然而,有一种特殊情况下可能会发生只写入部分数据的情况,即当使用非阻塞的套接字进行写操作时,并且写缓冲区已满。在这种情况下,writev() 可能只能写入部分数据,剩余的数据将被返回,以便稍后进行写入。此时,需要根据返回的结果进行进一步的处理,以确保所有数据都被正确写入。
因此,在使用 writev() 函数时,建议检查返回的写入字节数,以确保所有数据都被正确写入,如果需要,可以通过重试操作来处理部分写入的情况。

其实,使用sendmmsg和writev发送数据是枯燥和反人类的,需要构造mmsghdr和iovec等特定的数据结构,在上一篇文章中我们介绍了Go对sendmmsg的包装,简化了对sendmmsg的调用。这一篇我们介绍Go对writev系统的封装。Go并没有对readv进行封装,即使有人把实现的代码也贡献出来了,主要是大家觉得还没看到readv带来的性能的提升和益处

Go内部是在src/internal/poll/writev.go中封装的Writev, Linux环境下的wrtev系统调用是在fd_writev_unix.go实现的。

你也可以直接使用WritevReadv,比较偏底层了。

如果你觉得Go封装的Writev还是比较麻烦的话,你可以使用Buffers,它对于实现了writev的conn做了优化,优先调用writeBuffers:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func (v *Buffers) WriteTo(w io.Writer) (n int64, err error) {
if wv, ok := w.(buffersWriter); ok {
return wv.writeBuffers(v)
}
for _, b := range *v {
nb, err := w.Write(b)
n += int64(nb)
if err != nil {
v.consume(n)
return n, err
}
}
v.consume(n)
return n, nil
}
// buffersWriter is the interface implemented by Conns that support a
// "writev"-like batch write optimization.
// writeBuffers should fully consume and write all chunks from the
// provided Buffers, else it should report a non-nil error.
type buffersWriter interface {
writeBuffers(*Buffers) (int64, error)
}

下面是一个使用Buffers发送批量消息的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package main
import (
"fmt"
"net"
"strconv"
)
func main() {
conn, err := net.Dial("udp", "192.168.0.1:8972")
if err != nil {
panic(err)
}
var buf net.Buffers
for i := 0; i < 10; i++ {
buf = append(buf, []byte("hello world: "+strconv.Itoa(i)))
}
_, err = buf.WriteTo(conn) // as a datagram packet
if err != nil {
panic(err)
}
var data = make([]byte, 1024)
for {
n, err := conn.Read(data)
if err != nil {
panic(err)
}
fmt.Println(string(data[:n]))
}
}

它把10条消息加入到Buffers,然后一次写入,其实这十条消息是作为一个UDP包发送给服务器的,如果你的发送请求很多,需要批量进行发送的话,可以考虑这个方式。