用最简单的方式实现 Rust RPC 服务

RPC是对进程内函数调用的分布式开展,它将进程内的函数调用,扩展成对远程机器上的相应的函数的调用。如何使用最少的代码,用最容易的方式调用远程函数呢?

首先需要知道远程机器的IP地址和端口,然后呢,使用原先进程内的方式直接调用即可,这是最容易的RPC调用。容易不意味着功能简单,而是rpc库背后默默为你承担了序列化、网络传输、路由、服务调用、服务治理的功能。

rpcx秉承 Go 标准库简单而强大的设计理念,为 Rust 提供了一个原生的 rpc 库。

为什么说是原生呢?因为你可以使用任何你熟悉的编程语言通过HTTP或者JSON-RPC2.0的方式访问Go或者Java实现的rpcx服务,但是除了Go/Java编程语言你没有办法使用raw的rpcx protocol实现TCP的网络调用,而基于TCP的RPC性能要远远高于Request-Response这种类HTTP的调用模型。

现在rpcx for rust的库也发布了: rpcx-rs。 你可以使用它原生的访问Go或者Java实现的rpcx服务,也可以使用Rust提供rpcx服务。

关键是,依然是那么的简单。

目前 rpcx-rs发布了 0.1.2 版本,可以方便的将 Rust 函数暴露成rpcx服务,也可以使用Rust访问rpcx服务,支持 JSON 和 MessagePack 两种序列化方式。

rpcx-rs秉承着Go语言版本的信念,采用最简单的方式实现rpc服务,小到一年级的小学生,大到花甲之年的老太太,都能轻松的实现rpc服务。

后续的功能会持续增加,包括protobuf序列化的支持、服务治理各种功能(路由、失败处理、重试、熔断器、限流)、监控(metrics、trace)、注册中心(etcd、consul)等众多的功能。

那么,让我们看看如何开发一个rust实现的rpcx服务,以及如何调用,相应的Go语言的代码也一并列出。

我们的例子是一个乘法服务,客户端传入两个整数, 服务器端计算出两个数的,然后返回给客户端。

客户端和服务器端公用的数据

客户端和服务器端交互,一般不会使用字节数组,而是封装的数据对象,像Rust和Go语言中的struct、Java中的Class等,所以我们先定义好交流用的数据结构,这样服务器和客户端都可以使用了。

你要在你的Cargo.toml文件中引入rpcx库:

1
rpcx = "0.1.2"

然后定义数据结构。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
use std::error::Error as StdError;
use rmp_serde as rmps;
use serde::{Deserialize, Serialize};
use rpcx::*;
#[derive(RpcxParam, Default, Debug, Copy, Clone, Serialize, Deserialize)]
pub struct ArithAddArgs {
#[serde(rename = "A")]
pub a: u64,
#[serde(rename = "B")]
pub b: u64,
}
#[derive(RpcxParam, Default, Debug, Copy, Clone, Serialize, Deserialize)]
pub struct ArithAddReply {
#[serde(rename = "C")]
pub c: u64,
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
type Args struct {
A int
B int
}
type Reply struct {
C int
}
type Arith int
func (t *Arith) Mul(ctx context.Context, args *Args, reply *Reply) error {
reply.C = args.A * args.B
fmt.Printf("call: %d * %d = %d\n", args.A, args.B, reply.C)
return nil
}

我们使用serde实现 JSON 和 MessagePack 序列化格式的支持。你不必手工实现RpcxParam trait,通过 derive属性就可以自动为你的数据架构加上JSON 和 MessagePack 序列化格式的支持。同时你也需要在derive属性上加上DefaultSerialize, Deserialize,以便rpcx实现自动的序列化和初始化。

当然这一切都是通过属性完成的,你只需要定义数据结构即可。为了和Go默认的JSON属性相一致(GO默认序列化好的字段名称首字母是大写的),我们这里也加上了serde的属性,让serde进行 JSON序列化的时候使用指定的名称。

这里我们定义了传入参数ArithAddArgs和传出参数(ArithAddReply)。

服务端实现

服务器端实现了一个函数mul,函数的名称不重要,因为我们注册的时候会手工传入服务的路径名称和方法名称,也就是Go语言中实现的service_pathservice_method

函数的参数类型是ArithAddArgs,输出结果的类型是ArithAddReply

在main函数中我们新建了一个服务器,线程池中线程的数量默认是CPU的两倍,你也可以根据需要设置更大的线程数量。

然后我们注册了这个服务(函数),使用register_func!宏来进行注册,第一个参数是服务器实例,第二个参数是service_path、第三个参数是service_method,第四个参数是要注册的函数,第五个和第六个参数是函数的传入参数和传出参数的类型。

然后调用服务的start就可以提供服务了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
use mul_model::{ArithAddArgs, ArithAddReply};
use rpcx::*;
fn mul(args: ArithAddArgs) -> ArithAddReply {
ArithAddReply { c: args.a * args.b }
}
fn main() {
let mut rpc_server = Server::new("0.0.0.0:8972".to_owned(), 0);
register_func!(rpc_server, "Arith", "Mul", mul, ArithAddArgs, ArithAddReply);
rpc_server.start().unwrap();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package main
import (
"context"
"flag"
"fmt"
example "github.com/rpcx-ecosystem/rpcx-examples3"
"github.com/smallnest/rpcx/server"
)
var (
addr = flag.String("addr", "localhost:8972", "server address")
)
type Arith struct{}
// the second parameter is not a pointer
func (t *Arith) Mul(ctx context.Context, args example.Args, reply *example.Reply) error {
reply.C = args.A * args.B
fmt.Println("C=", reply.C)
return nil
}
func main() {
flag.Parse()
s := server.NewServer()
//s.Register(new(Arith), "")
s.RegisterName("Arith", new(Arith), "")
err := s.Serve("tcp", *addr)
if err != nil {
panic(err)
}
}

客户端实现

客户端使用Client::new先连接一个服务器,你可以指定序列化格式,服务器也会使用你的这个序列化格式返回结果,然后指定要调用的service_pathservice_method, 使用call返回结果(Option类型,因为有的调用不需要返回结果,返回结果是Result类型,可能是成功的结果,也可能是Error); 使用acall异步调用,会返回一个future。

你可以和Go语言的版本交叉着互相调用,可以看到很容易的我们就实现了rpc服务和调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
use std::collections::hash_map::HashMap;
use mul_model::*;
use rpcx::Client;
use rpcx::{Result, SerializeType};
pub fn main() {
let mut c: Client = Client::new("127.0.0.1:8972");
c.start().map_err(|err| println!("{}", err)).unwrap();
c.opt.serialize_type = SerializeType::JSON;
let mut a = 1;
loop {
let service_path = String::from("Arith");
let service_method = String::from("Mul");
let metadata = HashMap::new();
let args = ArithAddArgs { a: a, b: 10 };
a = a + 1;
let reply: Option<Result<ArithAddReply>> =
c.call(service_path, service_method, false, metadata, &args);
if reply.is_none() {
continue;
}
let result_reply = reply.unwrap();
match result_reply {
Ok(r) => println!("received: {:?}", r),
Err(err) => println!("received err:{}", err),
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package main
import (
"context"
"flag"
"log"
"github.com/smallnest/rpcx/protocol"
example "github.com/rpcx-ecosystem/rpcx-examples3"
"github.com/smallnest/rpcx/client"
)
var (
addr = flag.String("addr", "localhost:8972", "server address")
)
func main() {
flag.Parse()
d := client.NewPeer2PeerDiscovery("tcp@"+*addr, "")
opt := client.DefaultOption
opt.SerializeType = protocol.JSON
xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, opt)
defer xclient.Close()
args := example.Args{
A: 10,
B: 20,
}
reply := &example.Reply{}
err := xclient.Call(context.Background(), "Mul", args, reply)
if err != nil {
log.Fatalf("failed to call: %v", err)
}
log.Printf("%d * %d = %d", args.A, args.B, reply.C)
}