在四月份的一篇翻译的文章中,我介绍了读写Redis RESP version 2的协议的Go 语言的实现,你可以使用它采用底层的方式读写5.0以及以下版本的Redis。Redis 6.0还在开发之中年底或者明年初就要发布了。Redis 6.0支持多线程I/O,还有客户端缓存。
客户端缓存是未来Redis最重要的特性。如果我们需要快速存储和快速缓存,那么我们就需要在客户端存储数据的子集。这是为了提供小延迟、大规模数据的想法的自然延伸。很多公司都采用了在客户端缓存数据以避免每次都请求redis,但是本地缓存和redis服务器数据之间有延迟,很难保证数据的一致性。Ben Malec在Redis Conf 2018上做了一个关于客户端缓存的演讲,给了Salvatore Sanfilippo以灵感,Salvatore Sanfilippo决定在Redis 6.0中支持客户端缓存的功能。但是为了支持这个功能,使用当前的redis协议很难实现,所以他设计了下一代的Redis协议: RESP3。
所有的代码都放在了github上。
RESP3协议
RESP3 协议中增加了很多新的数据类型。
和 RESP version 2 等价的类型
- Array: 一个有序集合,包含N个其它类型
- Blob string: 二进制安全字符串
- Simple string: 一个节省空间的非二进制安全字符串
- Simple error: 一个节省空间的非二进制安全错误码和错误信息
- Number: 有符号64位整数
RESP3中新加的类型
- Null: 单一的空值,代替原先的 RESP v2 的
*-1
和 $-1
空值。
- Double: 浮点数
- Boolean: 布尔类型 true / false
- Blob error: 二进制安全的错误码和错误信息
- Verbatim string: 一个二进制安全字符串,带文本格式, 如命令
LATENCY DOCTOR
的输出
- Map: 一个有序的键值对
- Set: 一个无序的不重复的集合
- Attribute: 类似Map类型
- Push: 带外数据,格式类似数组,但是客户端需要检查第一个数据,第一个数据指示了带外数据的类型。注意带外数据并不是一个reply,它是redis主动推送的数据,所以客户端收到带外数据,交给对应的处理方法去处理后,你还需要继续读取你的reply数据
- Hello: hello命令的返回结果,类似Map类型,仅仅在客户端和服务器建立连接的时候发送
- Big number: 大数字类型
还有一种新加的stream类型,可以用来传送不确定具体长度的数据。在数据的开头有固定的标识符,在数据传输完毕后在加上这个40字节的标识符,40字节的标志符基本上不会和传输的数据有重复:
1 2 3
| $EOF:<40 bytes marker><CR><LF> ... any number of bytes of data here not containing the marker ... <40 bytes marker>
|
它以$EOF:
开始接着是40字节标识符,回车换行,接着就是数据,数据结束后是40字节的标识符。因为这种数据类型开始接收时数据长度不确定,所以我们对这种数据类型处理比较特殊,我们会解析出第一行的40字节标志符,后续的读取以及结束标志符需要调用者自己去读取和验证。
可以看到出了保持和原先的RESP version 2的数据类型一致外, RESP3的数据增加了很多数据类型。因为客户端发送的命令的格式比较简单,基本上是字符串的数组形式,所以这么多类型主要用在对服务器返回数据的发送和解析上。
每种数据类型有一个特殊的字符作为标识flag
,我们为每个类型定义了一个常量:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| const ( TypeBlobString = '$' TypeSimpleString = '+' TypeSimpleError = '-' TypeNumber = ':' TypeNull = '_' TypeDouble = ',' TypeBoolean = '#' TypeBlobError = '!' TypeVerbatimString = '=' TypeBigNumber = '(' TypeArray = '*' TypeMap = '%' TypeSet = '~' TypeAttribute = '|' TypePush = '>' TypeStream = "$EOF:" )
|
然后我们为所有的数据定义一个统一的模型,这样做的好处是我们可以使用统一的数据处理代码,避免复杂繁琐的数据类型:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| type Value struct { Type byte Str string StrFmt string Err string Integer int64 Boolean bool Double float64 BigInt *big.Int Elems []*Value // for array & set KV *linkedhashmap.Map Attrs *linkedhashmap.Map StreamMarker string }
|
Value
代表一个数据类型的值,它既可以是客户端发送的请求,也可以是服务器端的返回或者PUSH数据。
Type
是数据的类型标识,也就是那个特殊的字符。
对于不同的数据类型,只有部分的字段有意义,比如字符串类型,Str
字段有意义,Err
、Elems
、KV
等就没有意义了。Verbatim string
类型则Str
和StrFmt
有意义。 错误类型则Err
有意义。数组和Set类型Elems
有意义,Map和属性则KV
有意义。
NULL类型是没有值的,光靠Type就足够了。
因为RESP3中规定数据前面可以有多个属性,所以每个数据还都包含一个Attrs
字段,它表示这个数据的属性。
Stream类型只读取第一行,StreamMarker
字段表示它的40字节的标志符。
注意Map类型我们并没有使用Go标准库的Map,而是使用一个第三方的linkedhashmap.Map
库,原因在于RESP3规范中约定Map类型是有序的,而标准库的Map是基于Hash的无序的Map,所以不合适。
我们可以把这种数据表示成Redis传输的字符串。 首先我们要读取属性,看看这个值前面是否有属性,如果有的话先把属性编码。属性是一个Map类型,按照Map类型的方式编码就可以了。注意数量是键值对的数量。接下来按照不同的类型进行编码。对于复杂类型,我们对于它包含的值递归调用编码方法即可。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| func (r *Value) ToRESP3String() string { buf := new(strings.Builder) if r.Attrs != nil && r.Attrs.Size() > 0 { buf.WriteByte(TypeAttribute) buf.WriteString(strconv.Itoa(r.Attrs.Size())) buf.Write(CRLFByte) r.Attrs.Each(func(key, val interface{}) { k := key.(*Value) v := val.(*Value) buf.WriteByte(k.Type) k.toRESP3String(buf) buf.WriteByte(v.Type) v.toRESP3String(buf) }) } buf.WriteByte(r.Type) r.toRESP3String(buf) return buf.String() } func (r *Value) toRESP3String(buf *strings.Builder) { switch r.Type { case TypeSimpleString: buf.WriteString(r.Str) case TypeBlobString: ...... case TypeArray, TypeSet, TypePush: buf.WriteString(strconv.Itoa(len(r.Elems))) buf.Write(CRLFByte) for _, v := range r.Elems { buf.WriteByte(v.Type) v.toRESP3String(buf) } return ...... } }
|
有时候我们需要把数据返回成GO特性的类型,比如字符串、error、数组和Map等,我们也提供了一个SmartResult
的方法:
1 2 3 4 5 6 7 8 9
| func (r *Value) SmartResult() interface{} { switch r.Type { case TypeSimpleString: return r.Str case TypeBlobString: return r.Str ...... } }
|
Reader
上面定义了一个统一的数据类型Value
,那么如何从一个连接中读取这个Value
呢? 我们需要一个Reader
类型。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| type Reader struct { *bufio.Reader } func NewReader(reader io.Reader) *Reader { return NewReaderSize(reader, 32*1024) } func NewReaderSize(reader io.Reader, size int) *Reader { return &Reader{ Reader: bufio.NewReaderSize(reader, size), } }
|
我们定义了一个Reader
,你可以指定它的buffer大小,它的ReadValue
方法可以从reader中读取Value
对象,我们使用io.Reader
作为源而不是net.Conn
,是因为我们使用更通用的接口可以方便测试。
首先我们要检查是否有属性,如果有属性,先把属性读取,属性是Map的数据类型,所以按照Map的方式处理就可以了。接下来读取真正的数据。
简单一行的数据比较好处理。比如简单字符串,简单error、数字、布尔值、空值等等,我们为每种类型都提供了一个独立的解析方法,便于管理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| func (r *Reader) ReadValue() (*Value, []byte, error) { line, err := r.readLine() if err != nil { return nil, nil, err } if len(line) < 3 { return nil, nil, ErrInvalidSyntax } var attrs *linkedhashmap.Map if line[0] == TypeAttribute { attrs, err = r.readAttr(line) if err != nil { return nil, nil, err } line, err = r.readLine() } if line[0] == TypeBlobString && len(line) == 45 && bytes.Compare(line[:5], StreamMarkerPrefix) == 0 { return nil, line[5:], nil } v := &Value{ Type: line[0], Attrs: attrs, } switch v.Type { case TypeSimpleString: v.Str = string(line[1 : len(line)-2]) ...... } }
|
readLine
是读取一行的方法,Redis很多数据都是以回车换行符隔开的;getCount
从一行中读取数量值,比如数组的元素的数量等:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| func (r *Reader) readLine() (line []byte, err error) { line, err = r.ReadBytes('\n') if err != nil { return nil, err } if len(line) > 1 && line[len(line)-2] == '\r' { return line, nil } return nil, ErrInvalidSyntax } func (r *Reader) getCount(line []byte) (int, error) { end := bytes.IndexByte(line, '\r') return strconv.Atoi(string(line[1:end])) }
|
复杂类型的读取采用递归的方式解析,比如Map类型:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| func (r *Reader) readMap(line []byte) (*linkedhashmap.Map, error) { count, err := r.getCount(line) if err != nil { return nil, err } rt := linkedhashmap.New() for i := 0; i < count; i++ { k, streamMarkerPrefix, err := r.ReadValue() if err = isError(err, streamMarkerPrefix); err != nil { return nil, err } v, streamMarkerPrefix, err := r.ReadValue() if err = isError(err, streamMarkerPrefix); err != nil { return nil, err } rt.Put(k, v) } return rt, nil }
|
这样,我们一个底层的RESP3 Reader就实行了,你可以连接Redis 6.0 服务器, 然后从TCP连接中读取Value
返回值,根据不同的Type进行不同的处理,或者调用SmartResult
得到一个确定的值。
Writer
客户端的发送命令比较简单,因为发送的数据是一个字符串数组,所以编码成一个数组,数据的元素类型是字符串就可以。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| type Writer struct { *bufio.Writer } func NewWriter(writer io.Writer) *Writer { return &Writer{ Writer: bufio.NewWriter(writer), } } func (w *Writer) WriteCommand(args ...string) (err error) { w.WriteByte(TypeArray) w.WriteString(strconv.Itoa(len(args))) w.Write(CRLFByte) for _, arg := range args { w.WriteByte(TypeBlobString) w.WriteString(strconv.Itoa(len(arg))) w.Write(CRLFByte) w.WriteString(arg) w.Write(CRLFByte) } return w.Flush() }
|
这样,一个完整的RESP3的读写器就完成了。 有什么用?
你可以使用它和redis进行通讯,它支持目前还没有发布的redis 6.0。 你也可以基于它实现一个redis的Go client库,支持最新的redis 6.0的client库,可以支持接收PUSH消息,实现pipeline的机制,接收流数据等等。
你也可以使用它实现一个类似Codis的proxy,其中协议的解析就不用写了,直接使用它就可以。
客户端缓存
为了帮助客户端实现缓存,尽可能和redis的数据保持一致,redis需要一些额外的改进,这些额外的改进称之为tracking
。
key空间整体被分为很多的哈希槽,redis 6.0使用24比特位作为CRC64的输出,所以会有1600多万个不同的哈希槽。哈希槽的多少是一个tradeoff, 多了占用太多的内存空间,太少又容易引起惊群的现象。如果你有1亿个key,而在客户端缓存中,收到一个失效消息不应该影响过多的key。Redis用于存储invalidation表的内存开销为130MiB,一个1600万个条目,每个条目8字节的数组。这对我来说没问题,如果你想要这个功能,你就要充分利用客户端所有的内存,所以使用130MB服务器端内存作为代价;你所赢得的是更细粒度的失效。
客户端连接到redis服务器之后,要想打开这个特性,需要发送:
服务端回复+OK
表示正常,同时从那时开始,每一个只读命令,除了返回key对应的数据给调用者以外,还有一个副作用,就是记录所有客户端请求的key的哈希槽(但仅仅是对只读命令的key)。Redis存储这些信息的方法很简单。每个Redis客户端都有一个惟一的ID,因此,如果ID为123的客户端执行一个MGET 命令,它的keys对应的哈希槽是1、2和5,我们将得到一个包含以下条目的无效表:
1 2 3
| 1 -> [123] 2 -> [123] 5 -> [123]
|
稍后,ID为888的客户端来请求哈希槽5中的key,所以这个表变为:
现在,其他一些客户端在哈希槽5中更改了一些key。发生的情况是,Redis将检查Invalidation表,发现客户端123和888可能都在该槽上缓存了key。我们会向客户发送一个失效消息,他们可以有多种处理方式:要么记住哈希槽最新的失效时间戳,然后在使用的时候才检查缓存对象的失效时间戳,如果超出这个时间戳,则删除这个key,这称为lazy的方式。或者,客户端可以获取关于这个哈希槽的所有缓存内容,然后直接删除哈希槽即可。这种使用24位散列函数的方法不是问题,即使缓存了数千万个key,我们也不会有很长的哈希槽记录。在发送失效消息之后,服务端会从Invalidation表中删除条目,这样服务端将不再向这些客户端发送失效消息,直到它们再次读取该哈希槽内的key。
客户端也也可以有一些自己的设计,比如使用20比特位记录哈希槽,做好服务器的24比特位和20比特币之间的对应就好。
Redis是通过Push消息将失效消息发送给客户端的。你也可以使用另外一个客户端(1234)负责接收失效消息:
1
| CLIENT TRACKING on REDIRECT 1234
|
客户端ID可以通过CLIENT ID
请求获取。
接下来我们使用前面实现的读写器来验证TRACKING的功能。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80
| conn, err := net.DialTimeout("tcp", "127.0.0.1:6379", 5*time.Second) if err != nil { t.Logf("can't found one of redis 6.0 server") return } defer conn.Close() w := NewWriter(conn) r := NewReader(conn) w.WriteCommand("HELLO", "3") helloResp, _, err := r.ReadValue() if err != nil { t.Fatalf("failed to send a HELLO 3") } if helloResp.KV.Size() == 0 { t.Fatalf("expect some info but got %+v", helloResp) } t.Logf("hello response: %c, %v", helloResp.Type, helloResp.SmartResult()) w.WriteCommand("CLIENT", "TRACKING", "on") resp, _, err := r.ReadValue() if err != nil { t.Fatalf("failed to TRACKING: %v", err) } t.Logf("TRACKING result: %c, %+v", resp.Type, resp.SmartResult()) w.WriteCommand("GET", "a") resp, _, err = r.ReadValue() if err != nil { t.Fatalf("failed to GET: %v", err) } t.Logf("GET result: %c, %+v", resp.Type, resp.SmartResult()) go func() { conn, err := net.DialTimeout("tcp", "127.0.0.1:9999", 5*time.Second) if err != nil { t.Logf("can't found one of redis 6.0 server") return } defer conn.Close() w := NewWriter(conn) r := NewReader(conn) hash := crc64([]byte("a")) & (TRACKING_TABLE_SIZE - 1) t.Logf("calculated hash: %d", hash) for i := 0; i < 10; i++ { w.WriteCommand("set", "a", strconv.Itoa(i)) resp, _, err = r.ReadValue() if err != nil { t.Fatalf("failed to set: %v", err) } t.Logf("set result: %c, %+v", resp.Type, resp.SmartResult()) time.Sleep(200 * time.Millisecond) } }() for i := 0; i < 10; i++ { resp, _, err = r.ReadValue() if err != nil { t.Fatalf("failed to receive a message: %v", err) } if resp.Type == TypePush && len(resp.Elems) >= 2 && resp.Elems[0].SmartResult().(string) == "invalidate" { t.Logf("received TRACKING result: %c, %+v", resp.Type, resp.SmartResult()) w.WriteCommand("GET", "a") resp, _, err = r.ReadValue() } }
|
参考文档
- https://github.com/antirez/redis/blob/unstable/src/tracking.c
- http://antirez.com/news/130
- https://github.com/antirez/RESP3/blob/master/spec.md
- https://github.com/smallnest/resp3
- https://www.redisgreen.net/blog/reading-and-writing-redis-protocol/