从头再读取 io.Reader: 覆水难收?

前几天,我们百度的同学分享了Go标准库中一段好玩的好玩的代码, net/http/response.go中一段检查HTTP的headser中Content-Length未设置的情况下,对http.Body的有趣的处理。

我们知道io.Reader提供了Read方法,并没有将读取的数据再塞回去的方法,而且对于流式的Reader,也绝无可能将数据塞回去。就像覆水难收一样,泼出去的水,没办法收回来了。

如果我们想从Reader读取一部分字节,做一些处理(一般是做一些检查),然后想再让调用者从头开始读取咋办?

net/http/response.go中就有这么一段代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// Clone it, so we can modify r1 as needed.
r1 := new(Response)
*r1 = *r
if r1.ContentLength == 0 && r1.Body != nil {
// Is it actually 0 length? Or just unknown?
var buf [1]byte
n, err := r1.Body.Read(buf[:])
if err != nil && err != io.EOF {
return err
}
if n == 0 {
// Reset it to a known zero reader, in case underlying one
// is unhappy being read repeatedly.
r1.Body = NoBody
} else {
r1.ContentLength = -1
r1.Body = struct {
io.Reader
io.Closer
}{
io.MultiReader(bytes.NewReader(buf[:1]), r.Body),
r.Body,
}
}
}

这段代码主要是针对响应(Response)对象做了一次克隆(Clone),目的是为了能够安全地修改响应对象,而不影响原始的响应对象。

具体来看:

  1. r1 := new(Response) 创建一个新的响应对象
  2. r1 = r 让r1成为原始响应r的克隆
  3. 接下来判断如果响应内容长度r1.ContentLength == 0 且响应体r1.Body != nil
  4. 这说明内容长度标记为0,但实际上响应体不为空。这种情况下无法确定内容究竟是0长度还是长度未知。
  5. 所以读取1字节到buf,以判断响应体是否真是0长度。
  6. 如果读取到EOF,说明响应体确实长度为0,将Body重置为NoBody。
  7. 否则说明长度未知,将ContentLength设置为-1,并用MultiReader将已读取的1字节内容和原Body组合,作为新Body。
  8. 这样通过克隆的响应对象r1,可以安全地修改ContentLength和Body,而不影响原始响应对象r。

这段代码通过克隆请求对象,巧妙地处理了内容长度标记为0但实际有内容的情况,避免了对原始响应对象的修改。

它先读取了1个字节,来判断Body是否为空,不为空在通过io.MultiReader(bytes.NewReader(buf[:1]), r.Body),把这一个字节和原来的io.Reader(r.Body)在捏合在一起,形成一个新的io.Reader。

通过io.MultiReader新建一个 io.Reader,就可以把已读取的字节和剩余未读取的字节组合起来,形成都未读取的Reader。

标准库net/http/transfer.go中也有一段相同的逻辑处理。

这让我想起了soheilhy/cmux, rpcx最早使用它在一个端口上提供不同传输的协议。

cmux也是预先读取一部分数据,和预先配置的Matcher进行匹配,如果匹配成功,比如HTTP1.1协议,那么这个连接后续就按照HTTP1.1协议进行解析。那么预先读取的这些字节也得交给解析器从头开始解析,否则数据就缺失了,那么它是怎么实现的呢?

cmux使用老二另外一个方法,它创建了一个bufferedReader:

1
2
3
4
5
6
7
8
type bufferedReader struct {
source io.Reader
buffer bytes.Buffer
bufferRead int
bufferSize int
sniffing bool
lastErr error
}

当连接开始在侦探和哪个Matcher匹配的时候, conn连接会把数据写入到这个buffer中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func (s *bufferedReader) Read(p []byte) (int, error) {
if s.bufferSize > s.bufferRead { // buffer中有未读的数据,先读取这个
bn := copy(p, s.buffer.Bytes()[s.bufferRead:s.bufferSize])
s.bufferRead += bn
return bn, s.lastErr
} else if !s.sniffing && s.buffer.Cap() != 0 {
s.buffer = bytes.Buffer{}
}
// 从原始的conn中读取
sn, sErr := s.source.Read(p)
if sn > 0 && s.sniffing { // 如果还在侦探状态,把读取的数据写入到buffer中
s.lastErr = sErr
if wn, wErr := s.buffer.Write(p[:sn]); wErr != nil {
return wn, wErr
}
}
return sn, sErr
}

一旦侦探完成(match一个协议),那么就会把读取的指针置为最开始的地方,从头开始读取,根据上面的方法的逻辑,读取完buffer就从原始conn中读取,也不会再往buffer中写。

1
2
3
4
5
6
7
8
9
func (m *MuxConn) doneSniffing() {
m.buf.reset(false)
}
func (s *bufferedReader) reset(snif bool) {
s.sniffing = snif
s.bufferRead = 0 // 退回到原点,从最开始的数据开始读
s.bufferSize = s.buffer.Len() // 已读取的数据
}

通过这种方式,也实现了预读取的功能。

看起来,在Go语言中,还真的能覆水再回收。