使用Go语言实现 pping

大家好,我是鸟窝。

在前一篇“pping:被动式ping,计算网络时延”一篇中,我给大家介绍了 pping 这个工具的原理和使用方法。这篇文章中,我将使用 Go 语言实现 pping 工具。

通过这篇文章,你将了解到:

  • 如何使用gopacket来捕获和解析网络数据包
  • 如何设置捕获时长和过滤捕获的数据包
  • 如何在CGO下静态编译库,如libpcap
  • 了解TCP/IP协议栈的基本知识,如TCP Option
  • 如何进行数据的统计和定时输出和清理
  • 如何使用 pflag 来解析命令行参数

代码在: github.com/smallnest/pping-go

使用libpcap捕获数据包,并进行包过滤

我们并不直接使用libpcap,而是使用封装好的gopacket

gopacket是一个用于处理数据包的库,它提供了一个高级的API,可以用来处理数据包的解析、分析和生成。它支持多种数据包格式,包括Ethernet、IPv4、IPv6、TCP、UDP、ICMP等。

我们可以使用gopacket来捕获数据包,然后使用gopacket/layers包来解析数据包的各个部分。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 创建一个新的非活动 pcap 句柄, *liveInp是网卡的名称
inactive, _ := pcap.NewInactiveHandle(*liveInp)
// 使用 defer 关键字确保在函数结束时清理非活动句柄
defer inactive.CleanUp()
// 设置捕获的数据包的最大长度
inactive.SetSnapLen(snapLen)
// 激活非活动句柄,返回一个活动句柄和可能的错误
snif, err = inactive.Activate()
// 如果在激活句柄时出现错误,我们打印错误并退出程序
if err != nil {
fmt.Printf("couldn't open %s: %v\n", *fname, err)
os.Exit(1)
}

当然你也可以从一个tcpdump这样的工具捕获的pcap文件中解析包:

1
2
3
4
5
6
7
// 使用 pcap.OpenOffline 函数打开一个离线 pcap 文件,返回一个 pcap 句柄和可能的错误
snif, err = pcap.OpenOffline(*fname)
// 如果在打开文件时出现错误,我们打印错误并退出程序
if err != nil {
fmt.Printf("couldn't open %s: %v\n", *fname, err)
os.Exit(1)
}

之后设置filter进行包过滤, filter的格式和tcpdump使用的过滤格式一样,默认它会加上TCP,只处理TCP的包:

1
2
// 使用 SetBPFFilter 方法设置 BPF 过滤器,过滤器的规则由变量 filter 定义
snif.SetBPFFilter(filter)

之后处理这个包:

1
2
3
4
5
6
7
8
9
10
11
12
13
src := gopacket.NewPacketSource(snif, layers.LayerTypeEthernet)
// 使用 src.Packets() 获取一个数据包通道,我们可以从这个通道中读取数据包
packets := src.Packets()
for packet := range packets {
processPacket(packet)
......
// 如果结束或者需要定期打印统计信息,可以使用下面的代码
......
// 如果需要清理过期的数据
......

解析包

从TCP Option中解析时间戳的函数是getTSFromTCPOpts,它的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
// getTSFromTCPOpts 用于从 TCP 选项中获取时间戳信息
func getTSFromTCPOpts(tcp *layers.TCP) (uint32, uint32) {
var tsval, tsecr uint32
opts := tcp.Options
for _, opt := range opts {
if opt.OptionType == layers.TCPOptionKindTimestamps && opt.OptionLength == 10 { // Timestamp 选项长度为 10 字节
tsval = binary.BigEndian.Uint32(opt.OptionData[0:4])
tsecr = binary.BigEndian.Uint32(opt.OptionData[4:8])
break
}
}
return tsval, tsecr
}

解析IP和TCP包,并从TCP包的Option解析出时间戳:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// processPacket 用于处理捕获到的数据包
func processPacket(pkt gopacket.Packet) {
// 从数据包中获取 TCP 层
tcpLayer := pkt.Layer(layers.LayerTypeTCP)
if tcpLayer == nil {
not_tcp++
return
}
tcp, _ := tcpLayer.(*layers.TCP)
// 从 TCP 选项中获取时间戳信息
// 如果 TSval 为 0 或者 TSecr 为 0 并且不是 SYN 包,则不处理该数据包
tsval, tsecr := getTSFromTCPOpts(tcp)
if tsval == 0 || (tsecr == 0 && !tcp.SYN) {
no_TS++
return
}
// 从数据包中获取网络层
// 如果网络层不是 IPv4 或 IPv6,则不处理该数据包
netLayer := pkt.Layer(layers.LayerTypeIPv4)
if netLayer == nil {
netLayer = pkt.Layer(layers.LayerTypeIPv6)
if netLayer == nil {
not_v4or6++
return
}
}

目前为止我们从包中解析除了IP包和TCP包,接下里我们得到源目IP和源目端口,以及捕获时间:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// 从网络层中获取源 IP 和目的 IP
// 从 TCP 层中获取源端口和目的端口
// 用于构建流的源和目的
var ipsStr, ipdStr string
if ip, ok := netLayer.(*layers.IPv4); ok {
ipsStr = ip.SrcIP.String()
ipdStr = ip.DstIP.String()
} else {
ip := netLayer.(*layers.IPv6)
ipsStr = ip.SrcIP.String()
ipdStr = ip.DstIP.String()
}
srcStr := ipsStr + ":" + strconv.Itoa(int(tcp.SrcPort))
dstStr := ipdStr + ":" + strconv.Itoa(int(tcp.DstPort))
// 从数据包中获取捕获时间
captureTime := pkt.Metadata().CaptureInfo.Timestamp
// 如果 offTm 小于 0,则将捕获时间设置为 offTm
if offTm < 0 {
offTm = captureTime.Unix()
startm = float64(captureTime.Nanosecond()) * 1e-9
// 如果 sumInt 大于 0,则打印第一个数据包的时间
capTm = startm
if sumInt > 0 {
fmt.Printf("first packet at %s\n", captureTime.Format(time.UnixDate))
}
} else {
capTm = float64(captureTime.Unix()-offTm) + float64(captureTime.Nanosecond())*1e-9
}

接下来是从全局哈希表flows中查找流,如果没有则创建一个新的流,如果反向流已经存在,则设置反向流。如果反向流不存在,不处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
fstr := srcStr + "+" + dstStr
fr, ok := flows[fstr]
if !ok { // 新流
// 如果流的数量大于 maxFlows,则返回
if flowCnt >= maxFlows {
return
}
fr = &flowRec{
flowname: fstr,
min: 1e30,
}
flows[fstr] = fr
flowCnt++
// 如果反向流已经存在,则设置反向流
if _, ok := flows[dstStr+"+"+srcStr]; ok {
flows[dstStr+"+"+srcStr].revFlow = true
fr.revFlow = true
}
}
fr.last_tm = capTm
// 如果反向流不存在,不处理
if !fr.revFlow {
uniDir++
return
}

既然找到反向流了,说明正向反向的两个packet我们都获取到了了,那么就可以利用两次的捕获时间计算 RTT 了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// 统计流的发送字节数
arr_fwd := fr.bytesSnt + float64(pkt.Metadata().Length)
fr.bytesSnt = arr_fwd
// 增加时间戳
if !filtLocal || localIP != ipdStr {
addTS(fstr+"+"+strconv.FormatUint(uint64(tsval), 10), &tsInfo{capTm, arr_fwd, fr.bytesDep})
}
// 处理对应的反向流
ti := getTS(dstStr + "+" + srcStr + "+" + strconv.FormatUint(uint64(tsecr), 10))
if ti != nil && ti.t > 0.0 {
// 这是返回的数据包的捕获时间
t := ti.t
rtt := capTm - t
if fr.min > rtt {
fr.min = rtt // 跟踪最小值
}
// fBytes 存储了从源到目标的数据流的字节数
fBytes := ti.fBytes
// dBytes 存储了从目标到源的数据流的字节数
dBytes := ti.dBytes
// pBytes 存储了从上一次发送到现在的数据包的字节数
pBytes := arr_fwd - fr.lstBytesSnt
// 更新上一次发送的字节数为当前的发送字节数
fr.lstBytesSnt = arr_fwd
// 更新反向流的依赖字节数为 fBytes
flows[dstStr+"+"+srcStr].bytesDep = fBytes
if machineReadable {
// 打印捕获时间戳、本次rtt值、此流的最小值、字节数信息
fmt.Printf("%d.%06d %.6f %.6f %.0f %.0f %.0f", int64(capTm+float64(offTm)), int((capTm-float64(int64(capTm)))*1e6), rtt, fr.min, fBytes, dBytes, pBytes)
} else {
// 打印捕获时间、本次rtt值、此流的最小值、流的五元组
fmt.Printf("%s %s %s %s\n", captureTime.Format("15:04:05"), fmtTimeDiff(rtt), fmtTimeDiff(fr.min), fstr)
}
now := clockNow()
if now-nextFlush >= 0 {
nextFlush = now + flushInt
}
ti.t = -t // 将条目标记为已使用,避免再次保存这个 TSval
}
pktCnt++
}

清理过期数据

如果不清理,flowstsTbl中的数据会越来越多,最终撑爆。
我们遍历,删除过期的数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 清理超期的数据
func cleanUp(n float64) {
// 如果 TSval 的时间超过 tsvalMaxAge,则删除条目
for k, ti := range tsTbl {
if capTm-math.Abs(ti.t) > float64(tsvalMaxAge)/float64(time.Second) {
delete(tsTbl, k)
}
}
for k, fr := range flows {
if n-fr.last_tm > float64(flowMaxIdle)/float64(time.Second) {
delete(flows, k)
flowCnt--
}
}
}

使用 pflag 解析参数

相对于标准库的 pflag, github.com/spf13/pflag功能更为强大。这里我们使用它解析参数,可以设置短参数和长参数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
var (
liveInp = pflag.StringP("interface", "i", "", "interface name")
fname = pflag.StringP("read", "r", "", "pcap captured file")
filterOpt = pflag.StringP("filter", "f", "", "pcap filter applied to packets")
)
func main() {
pflag.DurationVarP(&sumInt, "sumInt", "q", 10*time.Second, "interval to print summary reports to stderr")
pflag.BoolVarP(&filtLocal, "showLocal", "l", false, "show RTTs through local host applications")
pflag.DurationVarP(&timeToRun, "seconds", "s", 0*time.Second, "stop after capturing for <num> seconds")
pflag.IntVarP(&maxPackets, "count", "c", 0, "stop after capturing <num> packets")
pflag.BoolVarP(&machineReadable, "machine", "m", false, "machine readable output")
pflag.DurationVarP(&tsvalMaxAge, "tsvalMaxAge", "M", 10*time.Second, "max age of an unmatched tsval")
pflag.DurationVarP(&flowMaxIdle, "flowMaxIdle", "F", 300*time.Second, "flows idle longer than <num> are deleted")
pflag.Parse()
...
}

静态编译

差点忘了。
我们使用gopacket来捕获数据包,它依赖于libpcap。我们需要在编译时链接libpcap库。但是在不同的操作系统上,libpcap的位置和名称可能不同。为了解决这个问题,我们可以使用CGO来链接libpcap库,然后使用go build来编译我们的程序。

1
go build -o pping .

不过如果你使用ldd查看这个程序,你会发现它有很多依赖的动态库:

1
2
3
4
5
6
7
[root@cypress pping]# ldd pping
linux-vdso.so.1 => (0x00007ffcf33e1000)
libpcap.so.1 => /lib64/libpcap.so.1 (0x00007f4b81933000)
libresolv.so.2 => /lib64/libresolv.so.2 (0x00007f4b81719000)
libpthread.so.0 => /lib64/libpthread.so.0 (0x00007f4b814fd000)
libc.so.6 => /lib64/libc.so.6 (0x00007f4b8112f000)
/lib64/ld-linux-x86-64.so.2 (0x00007f4b81b74000)

我们可以采用静态链接的方式,这样编译出来的pping,可以轻松的复制到其他的Linux机器上运行,不需要安装libpcap库。

1
2
3
4
5
6
7
8
9
10
11
12
[root@cypress pping]# go build -ldflags "-linkmode external -extldflags -static" .
# github.com/smallnest/pping
/tmp/go-link-79680640/000006.o:在函数‘_cgo_97ab22c4dc7b_C2func_getaddrinfo’中:
/tmp/go-build/cgo-gcc-prolog:60: 警告:Using 'getaddrinfo' in statically linked applications requires at runtime the shared libraries from the glibc version used for linking
//usr/local/lib/libpcap.a(nametoaddr.o):在函数‘pcap_nametoaddr’中:
/root/libpcap-1.10.0/./nametoaddr.c:181: 警告:Using 'gethostbyname' in statically linked applications requires at runtime the shared libraries from the glibc version used for linking
//usr/local/lib/libpcap.a(nametoaddr.o):在函数‘pcap_nametonetaddr’中:
/root/libpcap-1.10.0/./nametoaddr.c:270: 警告:Using 'getnetbyname_r' in statically linked applications requires at runtime the shared libraries from the glibc version used for linking
//usr/local/lib/libpcap.a(nametoaddr.o):在函数‘pcap_nametoproto’中:
/root/libpcap-1.10.0/./nametoaddr.c:527: 警告:Using 'getprotobyname_r' in statically linked applications requires at runtime the shared libraries from the glibc version used for linking
[root@cypress pping]# ldd pping
不是动态可执行文件

它的使用方法和标准库的flag类似。这样我们就能保证和c++的pping工具一样的参数解析了。

基于"Rust重写一切"的哲学,我期望早点能看到大家用Rust实现的 pping。