命令分发模式

命令分发模式 (command dispatcher pattern)不属于23种经典的设计模式。它是一种不太为人所知的设计模式,它主要用于构建可扩展、可插拔的系统架构,将请求与执行请求的操作对象解耦。它类似于命令模式(Command Pattern),但更加灵活和动态。

虽然Command Dispatch Pattern不属于那23种经典模式,但它确实是一种很有价值的模式,可以应用于需要在运行时动态添加、修改或删除操作的系统中,使系统更加灵活和可扩展。

这种模式通过允许方便的添加、替换或移除任何命令处理器, 非常的灵活,将命令调用和命令处理解耦。而且每个命令可以由单独的命令处理器处理,代码组织和维护也很方便。

实际上,对于 Gopher 来讲,必然已经接触到这个模式了,只不过少有人指出或者梳理这种模式,但是在标准库和一些知名的项目中,其实已经自然的应用了,而且看起来整个架构也非常的清爽。

让我们看一个标准库实现 web 服务的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package main
import (
"fmt"
"net/http"
)
// 定义 HTTP 处理程序函数
func homeHandler(w http.ResponseWriter, r *http.Request) {
fmt.Fprint(w, "Welcome to the home page!")
}
func aboutHandler(w http.ResponseWriter, r *http.Request) {
fmt.Fprint(w, "This is the about page.")
}
func main() {
// 创建一个新的 ServeMux 实例
mux := http.NewServeMux()
// 注册处理程序函数
mux.HandleFunc("/", homeHandler)
mux.HandleFunc("/about", aboutHandler)
// 启动 HTTP 服务器
fmt.Println("Starting server at :8080")
http.ListenAndServe(":8080", mux)
}

在这个示例中,我们可以把 http.ServeMux 看作是 Command Dispatcher,而 HTTP 请求就是一个命令。

通过 mux.HandleFunc ,将命令 (path,请求路径) 与命令处理程序 (handler, 处理程序函数) 进行绑定的过程。

新增一个命令,就注册一个新的处理程序。
移除一条命令,就删除那一条绑定语句。
修改一条命令,就替换对应的处理程序。

Http server 收到 HTTP 请求后,解析出路径信息,然后从注册的信息中找到这个路径对应的处理程序,然后调用这个处理程序。

这是一个经典的命令分发模式的应用。对于贝尔实验室出来的 Rob Pike、Russ Cox 来说,它们和 GoF 设计模式的这一派属于两个门派,所以在 Go 语言中 Rob Pike、Russ Cox 很少会讲到面向对象的设计模式,但是这种 HTTP 的这种实现方法我们可以把它归类为命令分发模式,而且是一个标准的模式实现。

接下来,我再给你介绍几种知名项目中使用这种模式的例子。毕竟,这种设计模式应用的场景之一就是微服务:

  • 分布式系统:在分布式系统中,命令分发模式可以用于将命令分配到不同的服务或节点进行处理。
  • 微服务架构:在微服务架构中,命令分发模式可以用于协调不同微服务之间的命令处理。
  • 复杂应用:在复杂应用中,命令分发模式可以用于解耦命令的发出者和处理者,提高系统的灵活性和可维护性。

Rpc 中处理

Go 生态圈中知名的微服务框架之一 rpcx 实现了两种命令分发方式:

  • 类似 Go 标准库的 rpc, 通过发射的方式找到对应的微服务方法,然后调用
  • 第二种是类似 Go 标准库这种路由绑定的方法,通过绑定 handler 方式

这两种方式都属于命令分发的设计模式,但是第二种更直观。比如下面一个微服务的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package main
import (
"flag"
example "github.com/rpcxio/rpcx-examples"
"github.com/smallnest/rpcx/server"
)
var (
addr = flag.String("addr", "localhost:8972", "server address")
)
func mul(ctx *server.Context) error {
var args example.Args
err := ctx.Bind(&args)
if err != nil {
return err
}
var reply example.Reply
reply.C = args.A * args.B
ctx.Write(reply)
return nil
}
func main() {
flag.Parse()
s := server.NewServer()
s.AddHandler("Arith", "Mul", mul)
err := s.Serve("tcp", *addr)
if err != nil {
panic(err)
}
}

这里例子以一个 乘法 的微服务做例子。函数 mul 是命令(微服务)处理函数。s.AddHandler 将微服务和命令处理函数进行绑定。这和 HTTP 的例子非常的类似。

server 就是一个分发器,它收到客户端的微服务请求,解析出微服务的名称 (名字和方法名),找到注册的 handler, 把参数传给 handler 去处理。

lesismal 实现的一个高性能的 Go 微服务框架 arpc 也是采用的这种方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package main
import (
"log"
"github.com/lesismal/arpc"
)
func main() {
svr := arpc.NewServer()
// register router
svr.Handler.Handle("/echo/sync", func(ctx *arpc.Context) {
str := ""
err := ctx.Bind(&str)
ctx.Write(str)
log.Printf("/echo/sync: \"%v\", error: %v", str, err)
})
// register router
svr.Handler.Handle("/echo/async", func(ctx *arpc.Context) {
str := ""
err := ctx.Bind(&str)
go ctx.Write(str)
log.Printf("/echo/async: \"%v\", error: %v", str, err)
})
svr.Run("localhost:8888")
}

处理微服务、web 路由等这些场景。在基础架构的软件开发中,这种模式对于实现命令式的基础服务组件,也非常的合适,接下来我给你介绍实现自研 memcached 和 redis 服务的基于命令分发模式实现的架构。

在自研基础架构产品中的应用

自研类 memcached 的缓存系统

Memcached是一种分布式内存对象缓存系统,用于加速动态Web应用程序的响应速度。它基于一种高效的基于内存的键值对存储,设计用于缓存小的数据块。

Memcached的主要优势包括:

  • 高性能:基于内存操作,能够提供非常高的读写性能。
  • 减少服务器负载:通过缓存数据减轻了数据库的访问压力。
  • 可扩展性:支持分布式集群部署,能够线性扩展。

Memcached的作者是Brad Fitzpatrick。也曾是 Go 开发团队成员之一,维护 Go HTTP 库等。也是 Go memcached client 库 bradfitz/gomemcache: 的作者。

这里我们介绍的是 Go memcached 服务端的库,在你实现类似 Memcached 服务的时候很有用。

Memcached 有文本和二进制两种协议,这里我们介绍的是文本协议,它比较简单,而且也方便使用 telenet 等命令测试。下面是一些常用的Memcached命令:

  1. 存储命令:
    • set key flags exptime bytes [noreply]: 存储一个键值对,并设置可选的标志、过期时间、数据长度和noreply。
    • add key flags exptime bytes [noreply]: 仅当键不存在时添加一个新的键值对。
    • replace key flags exptime bytes [noreply]: 仅当键已存在时替换键的值。
  2. 检索命令:
    • get key [key ...]: 获取一个或多个键的值。
    • gets key: 获取带有CAS令牌的键值对,用于检查并设置操作。
  3. 操作命令:
    • incr key value [noreply]: 将键的数值增加给定的值。
    • decr key value [noreply]: 将键的数值减少给定的值。
    • append key flags exptime bytes [noreply]: 将数据追加到一个已存在的键的值中。
    • prepend key flags exptime bytes [noreply]: 将数据添加到一个现有键的值的开始部分。
    • cas keyflags exptime bytes unique-cas-token [noreply]: 使用CAS令牌实现检查并设置操作。
  4. 删除命令:
    • delete key [noreply]: 删除一个键值对。
  5. 统计命令:
    • stats: 获取Memcached服务器统计信息。
    • stats reset: 重置Memcached服务器统计信息。
  6. 其他命令:
    • flush_all [delay] [noreply]: 清空整个Memcached服务器中的所有键值对数据。
    • version: 获取Memcached服务器版本信息。
    • quit: 关闭Memcached连接。

这些命令通过TCP连接以文本形式发送给Memcached服务器,服务器也以文本形式返回响应结果。例如,成功的响应以"OK"开头,错误响应以"ERROR"或"SERVER_ERROR"开头。

smallnest/gomemcached 是实现 memcache server 端的一个库,我们来看它的一个简单例子:

1
2
3
4
5
6
7
8
9
mockServer = NewServer(addr)
mockServer.RegisterFunc("get", DefaultGet)
mockServer.RegisterFunc("gets", DefaultGet)
mockServer.RegisterFunc("set", DefaultSet)
mockServer.RegisterFunc("delete", DefaultDelete)
mockServer.RegisterFunc("incr", DefaultIncr)
mockServer.RegisterFunc("flush_all", DefaultFlushAll)
mockServer.RegisterFunc("version", DefaultVersion)
mockServer.Start()

Server 是一个命令分发器,你可以注册你实现的命令处理函数。
你甚至扩展,为你的缓存产品增加 memcached 不支持的命令,比如 auth 等等。

自研类 Redis 的系统

如果你要开发类似 Redis 的服务,也有一个非常知名的库,甚至可以说是 Go 生态圈的首选,就是 tidwall/redcon

它基于命令分发模式,提供了一个通用的 Redis 服务端框架,下面是一个它的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
package main
import (
"log"
"strings"
"sync"
"github.com/tidwall/redcon"
)
var addr = ":6380"
func main() {
var mu sync.RWMutex
var items = make(map[string][]byte)
var ps redcon.PubSub
go log.Printf("started server at %s", addr)
err := redcon.ListenAndServe(addr,
func(conn redcon.Conn, cmd redcon.Command) {
switch strings.ToLower(string(cmd.Args[0])) {
default:
conn.WriteError("ERR unknown command '" + string(cmd.Args[0]) + "'")
case "ping":
conn.WriteString("PONG")
case "quit":
conn.WriteString("OK")
conn.Close()
case "set":
if len(cmd.Args) != 3 {
conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command")
return
}
mu.Lock()
items[string(cmd.Args[1])] = cmd.Args[2]
mu.Unlock()
conn.WriteString("OK")
case "get":
if len(cmd.Args) != 2 {
conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command")
return
}
mu.RLock()
val, ok := items[string(cmd.Args[1])]
mu.RUnlock()
if !ok {
conn.WriteNull()
} else {
conn.WriteBulk(val)
}
case "del":
if len(cmd.Args) != 2 {
conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command")
return
}
mu.Lock()
_, ok := items[string(cmd.Args[1])]
delete(items, string(cmd.Args[1]))
mu.Unlock()
if !ok {
conn.WriteInt(0)
} else {
conn.WriteInt(1)
}
case "publish":
if len(cmd.Args) != 3 {
conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command")
return
}
conn.WriteInt(ps.Publish(string(cmd.Args[1]), string(cmd.Args[2])))
case "subscribe", "psubscribe":
if len(cmd.Args) < 2 {
conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command")
return
}
command := strings.ToLower(string(cmd.Args[0]))
for i := 1; i < len(cmd.Args); i++ {
if command == "psubscribe" {
ps.Psubscribe(conn, string(cmd.Args[i]))
} else {
ps.Subscribe(conn, string(cmd.Args[i]))
}
}
}
},
func(conn redcon.Conn) bool {
// Use this function to accept or deny the connection.
// log.Printf("accept: %s", conn.RemoteAddr())
return true
},
func(conn redcon.Conn, err error) {
// This is called when the connection has been closed
// log.Printf("closed: %s, err: %v", conn.RemoteAddr(), err)
},
)
if err != nil {
log.Fatal(err)
}
}

redcon.ListenAndServe 是一个命令分发器,你可以注册你实现的命令处理函数。
在上面的例子中,每一个switch case是一种命令的处理,这里的例子很简单,就没有把每种命令的处理逻辑写成一个个独立的handler,而是直接在case中直接处理。如果我们要实现一个类Redis系统,最好的方式是把每种命令的处理逻辑写成一个个独立的handler,这样代码更清晰,更易维护。
这些handler最好按照redis命令的分类,分成几个文件,分别处理string、set等类型。

这样的设计,不仅符合命令分发模式,而且符合单一职责原则,代码更易维护。

从上面的几个例子来看,命令分发模式在实现类似 Memcached、Redis 这种基础架构产品中非常适用,它可以帮助我们实现一个灵活、可扩展的系统架构。本周的另外一篇文章,介绍一个基于SQLite的类Redisa的实现,也是采用了命令分发模式,请关注“鸟窝聊技术”公众号,及时获取最新的文章。