读写 Redis RESP3 协议以及Redis 6.0客户端缓存

在四月份的一篇翻译的文章中,我介绍了读写Redis RESP version 2的协议的Go 语言的实现,你可以使用它采用底层的方式读写5.0以及以下版本的Redis。Redis 6.0还在开发之中年底或者明年初就要发布了。Redis 6.0支持多线程I/O,还有客户端缓存。

客户端缓存是未来Redis最重要的特性。如果我们需要快速存储和快速缓存,那么我们就需要在客户端存储数据的子集。这是为了提供小延迟、大规模数据的想法的自然延伸。很多公司都采用了在客户端缓存数据以避免每次都请求redis,但是本地缓存和redis服务器数据之间有延迟,很难保证数据的一致性。Ben Malec在Redis Conf 2018上做了一个关于客户端缓存的演讲,给了Salvatore Sanfilippo以灵感,Salvatore Sanfilippo决定在Redis 6.0中支持客户端缓存的功能。但是为了支持这个功能,使用当前的redis协议很难实现,所以他设计了下一代的Redis协议: RESP3

所有的代码都放在了github上。

RESP3协议

RESP3 协议中增加了很多新的数据类型。

和 RESP version 2 等价的类型

  • Array: 一个有序集合,包含N个其它类型
  • Blob string: 二进制安全字符串
  • Simple string: 一个节省空间的非二进制安全字符串
  • Simple error: 一个节省空间的非二进制安全错误码和错误信息
  • Number: 有符号64位整数

RESP3中新加的类型

  • Null: 单一的空值,代替原先的 RESP v2 的*-1$-1 空值。
  • Double: 浮点数
  • Boolean: 布尔类型 true / false
  • Blob error: 二进制安全的错误码和错误信息
  • Verbatim string: 一个二进制安全字符串,带文本格式, 如命令LATENCY DOCTOR的输出
  • Map: 一个有序的键值对
  • Set: 一个无序的不重复的集合
  • Attribute: 类似Map类型
  • Push: 带外数据,格式类似数组,但是客户端需要检查第一个数据,第一个数据指示了带外数据的类型。注意带外数据并不是一个reply,它是redis主动推送的数据,所以客户端收到带外数据,交给对应的处理方法去处理后,你还需要继续读取你的reply数据
  • Hello: hello命令的返回结果,类似Map类型,仅仅在客户端和服务器建立连接的时候发送
  • Big number: 大数字类型

还有一种新加的stream类型,可以用来传送不确定具体长度的数据。在数据的开头有固定的标识符,在数据传输完毕后在加上这个40字节的标识符,40字节的标志符基本上不会和传输的数据有重复:

1
2
3
$EOF:<40 bytes marker><CR><LF>
... any number of bytes of data here not containing the marker ...
<40 bytes marker>

它以$EOF:开始接着是40字节标识符,回车换行,接着就是数据,数据结束后是40字节的标识符。因为这种数据类型开始接收时数据长度不确定,所以我们对这种数据类型处理比较特殊,我们会解析出第一行的40字节标志符,后续的读取以及结束标志符需要调用者自己去读取和验证。

可以看到出了保持和原先的RESP version 2的数据类型一致外, RESP3的数据增加了很多数据类型。因为客户端发送的命令的格式比较简单,基本上是字符串的数组形式,所以这么多类型主要用在对服务器返回数据的发送和解析上。

每种数据类型有一个特殊的字符作为标识flag,我们为每个类型定义了一个常量:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
const (
// simple types
TypeBlobString = '$' // $<length>\r\n<bytes>\r\n
TypeSimpleString = '+' // +<string>\r\n
TypeSimpleError = '-' // -<string>\r\n
TypeNumber = ':' // :<number>\r\n
TypeNull = '_' // _\r\n
TypeDouble = ',' // ,<floating-point-number>\r\n
TypeBoolean = '#' // #t\r\n or #f\r\n
TypeBlobError = '!' // !<length>\r\n<bytes>\r\n
TypeVerbatimString = '=' // =<length>\r\n<format(3 bytes):><bytes>\r\n
TypeBigNumber = '(' // (<big number>\n
// Aggregate data types
TypeArray = '*' // *<elements number>\r\n... numelements other types ...
TypeMap = '%' // %<elements number>\r\n... numelements key/value pair of other types ...
TypeSet = '~' // ~<elements number>\r\n... numelements other types ...
TypeAttribute = '|' // |~<elements number>\r\n... numelements map type ...
TypePush = '>' // ><elements number>\r\n<first item is String>\r\n... numelements-1 other types ...
//special type
TypeStream = "$EOF:" // $EOF:<40 bytes marker><CR><LF>... any number of bytes of data here not containing the marker ...<40 bytes marker>
)

然后我们为所有的数据定义一个统一的模型,这样做的好处是我们可以使用统一的数据处理代码,避免复杂繁琐的数据类型:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
type Value struct {
Type byte
Str string
StrFmt string
Err string
Integer int64
Boolean bool
Double float64
BigInt *big.Int
Elems []*Value // for array & set
KV *linkedhashmap.Map
Attrs *linkedhashmap.Map
StreamMarker string
}

Value代表一个数据类型的值,它既可以是客户端发送的请求,也可以是服务器端的返回或者PUSH数据。

Type是数据的类型标识,也就是那个特殊的字符。

对于不同的数据类型,只有部分的字段有意义,比如字符串类型,Str字段有意义,ErrElemsKV等就没有意义了。Verbatim string类型则StrStrFmt有意义。 错误类型则Err有意义。数组和Set类型Elems有意义,Map和属性则KV有意义。

NULL类型是没有值的,光靠Type就足够了。

因为RESP3中规定数据前面可以有多个属性,所以每个数据还都包含一个Attrs字段,它表示这个数据的属性。

Stream类型只读取第一行,StreamMarker字段表示它的40字节的标志符。

注意Map类型我们并没有使用Go标准库的Map,而是使用一个第三方的linkedhashmap.Map库,原因在于RESP3规范中约定Map类型是有序的,而标准库的Map是基于Hash的无序的Map,所以不合适。

我们可以把这种数据表示成Redis传输的字符串。 首先我们要读取属性,看看这个值前面是否有属性,如果有的话先把属性编码。属性是一个Map类型,按照Map类型的方式编码就可以了。注意数量是键值对的数量。接下来按照不同的类型进行编码。对于复杂类型,我们对于它包含的值递归调用编码方法即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
func (r *Value) ToRESP3String() string {
buf := new(strings.Builder)
//check attributes
if r.Attrs != nil && r.Attrs.Size() > 0 {
buf.WriteByte(TypeAttribute)
buf.WriteString(strconv.Itoa(r.Attrs.Size()))
buf.Write(CRLFByte)
r.Attrs.Each(func(key, val interface{}) {
k := key.(*Value)
v := val.(*Value)
buf.WriteByte(k.Type)
k.toRESP3String(buf)
buf.WriteByte(v.Type)
v.toRESP3String(buf)
})
}
buf.WriteByte(r.Type)
r.toRESP3String(buf)
return buf.String()
}
func (r *Value) toRESP3String(buf *strings.Builder) {
switch r.Type {
case TypeSimpleString:
buf.WriteString(r.Str)
case TypeBlobString:
......
case TypeArray, TypeSet, TypePush:
buf.WriteString(strconv.Itoa(len(r.Elems)))
buf.Write(CRLFByte)
for _, v := range r.Elems {
buf.WriteByte(v.Type)
v.toRESP3String(buf)
}
return
......
}
}

有时候我们需要把数据返回成GO特性的类型,比如字符串、error、数组和Map等,我们也提供了一个SmartResult的方法:

1
2
3
4
5
6
7
8
9
func (r *Value) SmartResult() interface{} {
switch r.Type {
case TypeSimpleString:
return r.Str
case TypeBlobString:
return r.Str
......
}
}

Reader

上面定义了一个统一的数据类型Value,那么如何从一个连接中读取这个Value呢? 我们需要一个Reader类型。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
type Reader struct {
*bufio.Reader
}
// NewReader returns a RESP3 reader.
func NewReader(reader io.Reader) *Reader {
return NewReaderSize(reader, 32*1024)
}
// NewReaderSize returns a new Reader whose buffer has at least the specified size.
func NewReaderSize(reader io.Reader, size int) *Reader {
return &Reader{
Reader: bufio.NewReaderSize(reader, size),
}
}

我们定义了一个Reader,你可以指定它的buffer大小,它的ReadValue方法可以从reader中读取Value对象,我们使用io.Reader作为源而不是net.Conn,是因为我们使用更通用的接口可以方便测试。

首先我们要检查是否有属性,如果有属性,先把属性读取,属性是Map的数据类型,所以按照Map的方式处理就可以了。接下来读取真正的数据。

简单一行的数据比较好处理。比如简单字符串,简单error、数字、布尔值、空值等等,我们为每种类型都提供了一个独立的解析方法,便于管理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func (r *Reader) ReadValue() (*Value, []byte, error) {
line, err := r.readLine()
if err != nil {
return nil, nil, err
}
if len(line) < 3 {
return nil, nil, ErrInvalidSyntax
}
var attrs *linkedhashmap.Map
if line[0] == TypeAttribute {
attrs, err = r.readAttr(line)
if err != nil {
return nil, nil, err
}
line, err = r.readLine()
}
// check stream. if it is stream, return the stream marker
if line[0] == TypeBlobString && len(line) == 45 && bytes.Compare(line[:5], StreamMarkerPrefix) == 0 {
return nil, line[5:], nil
}
v := &Value{
Type: line[0],
Attrs: attrs,
}
switch v.Type {
case TypeSimpleString:
v.Str = string(line[1 : len(line)-2])
......
}
}

readLine是读取一行的方法,Redis很多数据都是以回车换行符隔开的;getCount从一行中读取数量值,比如数组的元素的数量等:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func (r *Reader) readLine() (line []byte, err error) {
line, err = r.ReadBytes('\n')
if err != nil {
return nil, err
}
if len(line) > 1 && line[len(line)-2] == '\r' {
return line, nil
}
return nil, ErrInvalidSyntax
}
func (r *Reader) getCount(line []byte) (int, error) {
end := bytes.IndexByte(line, '\r')
return strconv.Atoi(string(line[1:end]))
}

复杂类型的读取采用递归的方式解析,比如Map类型:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func (r *Reader) readMap(line []byte) (*linkedhashmap.Map, error) {
count, err := r.getCount(line)
if err != nil {
return nil, err
}
rt := linkedhashmap.New()
for i := 0; i < count; i++ {
k, streamMarkerPrefix, err := r.ReadValue()
if err = isError(err, streamMarkerPrefix); err != nil {
return nil, err
}
v, streamMarkerPrefix, err := r.ReadValue()
if err = isError(err, streamMarkerPrefix); err != nil {
return nil, err
}
rt.Put(k, v)
}
return rt, nil
}

这样,我们一个底层的RESP3 Reader就实行了,你可以连接Redis 6.0 服务器, 然后从TCP连接中读取Value返回值,根据不同的Type进行不同的处理,或者调用SmartResult得到一个确定的值。

Writer

客户端的发送命令比较简单,因为发送的数据是一个字符串数组,所以编码成一个数组,数据的元素类型是字符串就可以。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
type Writer struct {
*bufio.Writer
}
// NewWriter returns a redis client writer.
func NewWriter(writer io.Writer) *Writer {
return &Writer{
Writer: bufio.NewWriter(writer),
}
}
// WriteCommand write a redis command.
func (w *Writer) WriteCommand(args ...string) (err error) {
// write the array flag
w.WriteByte(TypeArray)
w.WriteString(strconv.Itoa(len(args)))
w.Write(CRLFByte)
// write blobstring
for _, arg := range args {
w.WriteByte(TypeBlobString)
w.WriteString(strconv.Itoa(len(arg)))
w.Write(CRLFByte)
w.WriteString(arg)
w.Write(CRLFByte)
}
return w.Flush()
}

这样,一个完整的RESP3的读写器就完成了。 有什么用?

你可以使用它和redis进行通讯,它支持目前还没有发布的redis 6.0。 你也可以基于它实现一个redis的Go client库,支持最新的redis 6.0的client库,可以支持接收PUSH消息,实现pipeline的机制,接收流数据等等。

你也可以使用它实现一个类似Codis的proxy,其中协议的解析就不用写了,直接使用它就可以。

客户端缓存

为了帮助客户端实现缓存,尽可能和redis的数据保持一致,redis需要一些额外的改进,这些额外的改进称之为tracking

key空间整体被分为很多的哈希槽,redis 6.0使用24比特位作为CRC64的输出,所以会有1600多万个不同的哈希槽。哈希槽的多少是一个tradeoff, 多了占用太多的内存空间,太少又容易引起惊群的现象。如果你有1亿个key,而在客户端缓存中,收到一个失效消息不应该影响过多的key。Redis用于存储invalidation表的内存开销为130MiB,一个1600万个条目,每个条目8字节的数组。这对我来说没问题,如果你想要这个功能,你就要充分利用客户端所有的内存,所以使用130MB服务器端内存作为代价;你所赢得的是更细粒度的失效。

客户端连接到redis服务器之后,要想打开这个特性,需要发送:

1
CLIENT TRACKING on

服务端回复+OK表示正常,同时从那时开始,每一个只读命令,除了返回key对应的数据给调用者以外,还有一个副作用,就是记录所有客户端请求的key的哈希槽(但仅仅是对只读命令的key)。Redis存储这些信息的方法很简单。每个Redis客户端都有一个惟一的ID,因此,如果ID为123的客户端执行一个MGET 命令,它的keys对应的哈希槽是1、2和5,我们将得到一个包含以下条目的无效表:

1
2
3
1 -> [123]
2 -> [123]
5 -> [123]

稍后,ID为888的客户端来请求哈希槽5中的key,所以这个表变为:

1
5 -> [123, 888]

现在,其他一些客户端在哈希槽5中更改了一些key。发生的情况是,Redis将检查Invalidation表,发现客户端123和888可能都在该槽上缓存了key。我们会向客户发送一个失效消息,他们可以有多种处理方式:要么记住哈希槽最新的失效时间戳,然后在使用的时候才检查缓存对象的失效时间戳,如果超出这个时间戳,则删除这个key,这称为lazy的方式。或者,客户端可以获取关于这个哈希槽的所有缓存内容,然后直接删除哈希槽即可。这种使用24位散列函数的方法不是问题,即使缓存了数千万个key,我们也不会有很长的哈希槽记录。在发送失效消息之后,服务端会从Invalidation表中删除条目,这样服务端将不再向这些客户端发送失效消息,直到它们再次读取该哈希槽内的key。

客户端也也可以有一些自己的设计,比如使用20比特位记录哈希槽,做好服务器的24比特位和20比特币之间的对应就好。

Redis是通过Push消息将失效消息发送给客户端的。你也可以使用另外一个客户端(1234)负责接收失效消息:

1
CLIENT TRACKING on REDIRECT 1234

客户端ID可以通过CLIENT ID请求获取。

接下来我们使用前面实现的读写器来验证TRACKING的功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
// 首先连接一个redis 6.0服务器,只有redis 6.0的服务器才开始支持TRACKING
conn, err := net.DialTimeout("tcp", "127.0.0.1:6379", 5*time.Second)
if err != nil {
t.Logf("can't found one of redis 6.0 server")
return
}
defer conn.Close()
w := NewWriter(conn)
r := NewReader(conn)
// 告诉redis采用RESP3的协议
w.WriteCommand("HELLO", "3")
helloResp, _, err := r.ReadValue()
if err != nil {
t.Fatalf("failed to send a HELLO 3")
}
if helloResp.KV.Size() == 0 {
t.Fatalf("expect some info but got %+v", helloResp)
}
t.Logf("hello response: %c, %v", helloResp.Type, helloResp.SmartResult())
// 通知服务器开始追踪
w.WriteCommand("CLIENT", "TRACKING", "on")
resp, _, err := r.ReadValue()
if err != nil {
t.Fatalf("failed to TRACKING: %v", err)
}
t.Logf("TRACKING result: %c, %+v", resp.Type, resp.SmartResult())
// 请求一次,服务器应该计算出哈希槽,并且关联这个哈希槽和这个连接
w.WriteCommand("GET", "a")
resp, _, err = r.ReadValue()
if err != nil {
t.Fatalf("failed to GET: %v", err)
}
t.Logf("GET result: %c, %+v", resp.Type, resp.SmartResult())
// 启动另外一个连接,模拟更新数据
go func() {
conn, err := net.DialTimeout("tcp", "127.0.0.1:9999", 5*time.Second)
if err != nil {
t.Logf("can't found one of redis 6.0 server")
return
}
defer conn.Close()
w := NewWriter(conn)
r := NewReader(conn)
// 根据key计算出的哈希槽的哈希,服务器的PUSH消息应该推送这个槽的hash
hash := crc64([]byte("a")) & (TRACKING_TABLE_SIZE - 1)
t.Logf("calculated hash: %d", hash)
for i := 0; i < 10; i++ {
// 模拟更新数据
w.WriteCommand("set", "a", strconv.Itoa(i))
resp, _, err = r.ReadValue()
if err != nil {
t.Fatalf("failed to set: %v", err)
}
t.Logf("set result: %c, %+v", resp.Type, resp.SmartResult())
time.Sleep(200 * time.Millisecond)
}
}()
for i := 0; i < 10; i++ {
// 读取一个PUSH数据
resp, _, err = r.ReadValue()
if err != nil {
t.Fatalf("failed to receive a message: %v", err)
}
if resp.Type == TypePush && len(resp.Elems) >= 2 && resp.Elems[0].SmartResult().(string) == "invalidate" {
t.Logf("received TRACKING result: %c, %+v", resp.Type, resp.SmartResult())
// 推送消息后,服务器就不再关联这个哈希槽和这个连接了,所以我们需要在拉取一次数据,以便继续跟踪
w.WriteCommand("GET", "a")
resp, _, err = r.ReadValue()
}
}

参考文档

  1. https://github.com/antirez/redis/blob/unstable/src/tracking.c
  2. http://antirez.com/news/130
  3. https://github.com/antirez/RESP3/blob/master/spec.md
  4. https://github.com/smallnest/resp3
  5. https://www.redisgreen.net/blog/reading-and-writing-redis-protocol/