RPC是对进程内函数调用的分布式开展,它将进程内的函数调用,扩展成对远程机器上的相应的函数的调用。如何使用最少的代码,用最容易的方式调用远程函数呢?
首先需要知道远程机器的IP地址和端口,然后呢,使用原先进程内的方式直接调用即可,这是最容易的RPC调用。容易不意味着功能简单,而是rpc库背后默默为你承担了序列化、网络传输、路由、服务调用、服务治理的功能。
rpcx秉承 Go 标准库简单而强大的设计理念,为 Rust 提供了一个原生的 rpc 库。
为什么说是原生呢?因为你可以使用任何你熟悉的编程语言通过HTTP或者JSON-RPC2.0的方式访问Go或者Java实现的rpcx服务,但是除了Go/Java编程语言你没有办法使用raw的rpcx protocol实现TCP的网络调用,而基于TCP的RPC性能要远远高于Request-Response这种类HTTP的调用模型。
现在rpcx for rust的库也发布了: rpcx-rs。 你可以使用它原生的访问Go或者Java实现的rpcx服务,也可以使用Rust提供rpcx服务。
关键是,依然是那么的简单。
目前 rpcx-rs发布了 0.1.2 版本,可以方便的将 Rust 函数暴露成rpcx服务,也可以使用Rust访问rpcx服务,支持 JSON 和 MessagePack 两种序列化方式。
rpcx-rs秉承着Go语言版本的信念,采用最简单的方式实现rpc服务,小到一年级的小学生,大到花甲之年的老太太,都能轻松的实现rpc服务。
后续的功能会持续增加,包括protobuf序列化的支持、服务治理各种功能(路由、失败处理、重试、熔断器、限流)、监控(metrics、trace)、注册中心(etcd、consul)等众多的功能。
那么,让我们看看如何开发一个rust实现的rpcx服务,以及如何调用,相应的Go语言的代码也一并列出。
我们的例子是一个乘法
服务,客户端传入两个整数, 服务器端计算出两个数的积
,然后返回给客户端。
客户端和服务器端公用的数据
客户端和服务器端交互,一般不会使用字节数组,而是封装的数据对象,像Rust和Go语言中的struct
、Java中的Class等,所以我们先定义好交流用的数据结构,这样服务器和客户端都可以使用了。
你要在你的Cargo.toml文件中引入rpcx库:
然后定义数据结构。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| use std::error::Error as StdError; use rmp_serde as rmps; use serde::{Deserialize, Serialize}; use rpcx::*; #[derive(RpcxParam, Default, Debug, Copy, Clone, Serialize, Deserialize)] pub struct ArithAddArgs { #[serde(rename = "A")] pub a: u64, #[serde(rename = "B")] pub b: u64, } #[derive(RpcxParam, Default, Debug, Copy, Clone, Serialize, Deserialize)] pub struct ArithAddReply { #[serde(rename = "C")] pub c: u64, }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| type Args struct { A int B int } type Reply struct { C int } type Arith int func (t *Arith) Mul(ctx context.Context, args *Args, reply *Reply) error { reply.C = args.A * args.B fmt.Printf("call: %d * %d = %d\n", args.A, args.B, reply.C) return nil }
|
我们使用serde
实现 JSON 和 MessagePack 序列化格式的支持。你不必手工实现RpcxParam
trait,通过 derive
属性就可以自动为你的数据架构加上JSON 和 MessagePack 序列化格式的支持。同时你也需要在derive属性上加上Default
、 Serialize
, Deserialize
,以便rpcx实现自动的序列化和初始化。
当然这一切都是通过属性完成的,你只需要定义数据结构即可。为了和Go默认的JSON属性相一致(GO默认序列化好的字段名称首字母是大写的),我们这里也加上了serde的属性,让serde进行 JSON序列化的时候使用指定的名称。
这里我们定义了传入参数ArithAddArgs
和传出参数(ArithAddReply
)。
服务端实现
服务器端实现了一个函数mul
,函数的名称不重要,因为我们注册的时候会手工传入服务的路径名称和方法名称,也就是Go语言中实现的service_path
和service_method
。
函数的参数类型是ArithAddArgs
,输出结果的类型是ArithAddReply
。
在main函数中我们新建了一个服务器,线程池中线程的数量默认是CPU的两倍,你也可以根据需要设置更大的线程数量。
然后我们注册了这个服务(函数),使用register_func!
宏来进行注册,第一个参数是服务器实例,第二个参数是service_path
、第三个参数是service_method
,第四个参数是要注册的函数,第五个和第六个参数是函数的传入参数和传出参数的类型。
然后调用服务的start
就可以提供服务了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| use mul_model::{ArithAddArgs, ArithAddReply}; use rpcx::*; fn mul(args: ArithAddArgs) -> ArithAddReply { ArithAddReply { c: args.a * args.b } } fn main() { let mut rpc_server = Server::new("0.0.0.0:8972".to_owned(), 0); register_func!(rpc_server, "Arith", "Mul", mul, ArithAddArgs, ArithAddReply); rpc_server.start().unwrap(); }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| package main import ( "context" "flag" "fmt" example "github.com/rpcx-ecosystem/rpcx-examples3" "github.com/smallnest/rpcx/server" ) var ( addr = flag.String("addr", "localhost:8972", "server address") ) type Arith struct{} func (t *Arith) Mul(ctx context.Context, args example.Args, reply *example.Reply) error { reply.C = args.A * args.B fmt.Println("C=", reply.C) return nil } func main() { flag.Parse() s := server.NewServer() s.RegisterName("Arith", new(Arith), "") err := s.Serve("tcp", *addr) if err != nil { panic(err) } }
|
客户端实现
客户端使用Client::new
先连接一个服务器,你可以指定序列化格式,服务器也会使用你的这个序列化格式返回结果,然后指定要调用的service_path
和service_method
, 使用call
返回结果(Option类型,因为有的调用不需要返回结果,返回结果是Result类型,可能是成功的结果,也可能是Error); 使用acall
异步调用,会返回一个future。
你可以和Go语言的版本交叉着互相调用,可以看到很容易的我们就实现了rpc服务和调用。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| use std::collections::hash_map::HashMap; use mul_model::*; use rpcx::Client; use rpcx::{Result, SerializeType}; pub fn main() { let mut c: Client = Client::new("127.0.0.1:8972"); c.start().map_err(|err| println!("{}", err)).unwrap(); c.opt.serialize_type = SerializeType::JSON; let mut a = 1; loop { let service_path = String::from("Arith"); let service_method = String::from("Mul"); let metadata = HashMap::new(); let args = ArithAddArgs { a: a, b: 10 }; a = a + 1; let reply: Option<Result<ArithAddReply>> = c.call(service_path, service_method, false, metadata, &args); if reply.is_none() { continue; } let result_reply = reply.unwrap(); match result_reply { Ok(r) => println!("received: {:?}", r), Err(err) => println!("received err:{}", err), } } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| package main import ( "context" "flag" "log" "github.com/smallnest/rpcx/protocol" example "github.com/rpcx-ecosystem/rpcx-examples3" "github.com/smallnest/rpcx/client" ) var ( addr = flag.String("addr", "localhost:8972", "server address") ) func main() { flag.Parse() d := client.NewPeer2PeerDiscovery("tcp@"+*addr, "") opt := client.DefaultOption opt.SerializeType = protocol.JSON xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, opt) defer xclient.Close() args := example.Args{ A: 10, B: 20, } reply := &example.Reply{} err := xclient.Call(context.Background(), "Mul", args, reply) if err != nil { log.Fatalf("failed to call: %v", err) } log.Printf("%d * %d = %d", args.A, args.B, reply.C) }
|