安装
在我们的项目根下,在命令行执行 Go 语言的 gRPC 库的安装命令,如下:
$ go get -u google.golang.org/grpc@v1.29.1
示例
修改 hello.proto 文件,新增了 HelloService 接口:
syntax = "proto3"; package proto; message String { string value = 1; } service HelloService { rpc Hello (String) returns (String); } 复制代码
然后使用 protoc-gen-go 内置的 gRPC 插件生成 gRPC 代码:
$ protoc --go_out=plugins=grpc:. ./proto/*.proto
查看生产的 hello.pb.go 文件,gRPC 插件为服务端和客户端生成不同的接口:
// HelloServiceServer is the server API for HelloService service. type HelloServiceServer interface { Hello(context.Context, *String) (*String, error) } // HelloServiceClient is the client API for HelloService service. type HelloServiceClient interface { Hello(ctx context.Context, in *String, opts ...grpc.CallOption) (*String, error) } 复制代码
gRPC 通过 context.Context 参数,为每个方法调用提供了上下文支持。
基于服务端的 HelloServiceServer 接口,我们重新来实现 HelloService 服务:
package main import ( "context" "google.golang.org/grpc" "log" "net" pb "rpc/proto" // 设置引用别名 ) type HelloServiceImpl struct{} func (p *HelloServiceImpl) Hello(ctx context.Context, args *pb.String) (*pb.String, error) { reply := &pb.String{Value: "hello:" + args.GetValue()} return reply, nil } func main() { grpcServer := grpc.NewServer() pb.RegisterHelloServiceServer(grpcServer, new(HelloServiceImpl)) lis, err := net.Listen("tcp", ":1234") if err != nil { log.Fatal(err) } grpcServer.Serve(lis) } 复制代码
首先通过
grpc.NewServer()
构造一个 gRPC 服务对象,然后通过 gRPC 插件生成的RegisterHelloServiceServer
函数注册我们实现的 HelloServiceImpl 服务。然后通过grpcServer.Serve(lis)
在一个监听端口上提供 gRPC 服务。
客户端链接 gRPC 服务:
package main import ( "context" "fmt" "google.golang.org/grpc" "log" pb "rpc/proto" // 设置引用别名 ) func main() { conn, err := grpc.Dial("localhost:1234", grpc.WithInsecure()) if err != nil { log.Fatal("dialing err:", err) } defer conn.Close() client := pb.NewHelloServiceClient(conn) reply, err := client.Hello(context.Background(), &pb.String{Value: "wekenw"}) if err != nil { log.Fatal(err) } fmt.Println(reply.GetValue()) } 复制代码
其中 grpc.Dial 负责和 gRPC 服务建立链接,然后
NewHelloServiceClient
函数基于已经建立的链接构造 HelloServiceClient 对象。返回的 client 其实是一个 HelloServiceClient 接口对象,通过接口定义的方法就可以调用服务端对应的 gRPC 服务提供的方法。
开启服务器端,开启客户端。客户端的执行结果如下:
$ go run client.go hello:wekenw 复制代码
以上为 grpc 的一元 RPC(Unary RPC)
调用方式。还有三种方式,下面我们分别介绍下。
Server-side streaming RPC:服务端流式 RPC
服务器端流式 RPC,单向流,Server 为 Stream,Client 为普通的一元 RPC 请求。
简单来讲就是客户端发起一次普通的 RPC 请求,服务端通过流式响应多次发送数据集,客户端 Recv 接收数据集。
Proto :
syntax = "proto3"; package proto; message String { string value = 1; } service HelloService { rpc Hello (String) returns (stream String){}; } 复制代码
Server:
package main import ( "google.golang.org/grpc" "log" "net" pb "rpc/proto" // 设置引用别名 "strconv" ) // HelloServiceImpl 定义我们的服务 type HelloServiceImpl struct{} //实现Hello方法 func (p *HelloServiceImpl) Hello(req *pb.String, srv pb.HelloService_HelloServer) error { for n := 0; n < 5; n++ { // 向流中发送消息, 默认每次send送消息最大长度为`math.MaxInt32`bytes err := srv.Send(&pb.String{ Value: req.Value + strconv.Itoa(n), }) if err != nil { return err } } return nil } func main() { // 新建gRPC服务器实例 grpcServer := grpc.NewServer() // 在gRPC服务器注册我们的服务 pb.RegisterHelloServiceServer(grpcServer, new(HelloServiceImpl)) lis, err := net.Listen("tcp", ":1234") if err != nil { log.Fatal(err) } log.Println(" net.Listing...") //用服务器 Serve() 方法以及我们的端口信息区实现阻塞等待,直到进程被杀死或者 Stop() 被调用 err = grpcServer.Serve(lis) if err != nil { log.Fatalf("grpcServer.Serve err: %v", err) } } 复制代码
Hello 的参数和返回值是编译 proto 时生成的 .pb.go 文件中有定义,我们只需要实现就可以了。
Server 端,主要留意 stream.Send 方法,通过阅读源码,可得知是 protoc 在生成时,根据定义生成了各式各样符合标准的接口方法。最终再统一调度内部的 SendMsg 方法,该方法涉及以下过程:
- 消息体(对象)序列化。
- 压缩序列化后的消息体。
- 对正在传输的消息体增加 5 个字节的 header(标志位)。
- 判断压缩 + 序列化后的消息体总字节长度是否大于预设的 maxSendMessageSize(预设值为 math.MaxInt32),若超出则提示错误。
- 写入给流的数据集。
Client:
package main import ( "context" "google.golang.org/grpc" "io" "log" pb "rpc/proto" // 设置引用别名 ) // SayHello 调用服务端的 Hello 方法 func SayHello(client pb.HelloServiceClient, r *pb.String) error { stream, _ := client.Hello(context.Background(), r) for { resp, err := stream.Recv() if err == io.EOF { break } if err != nil { return err } log.Printf("resp: %v", resp) } return nil } func main() { conn, err := grpc.Dial("localhost:1234", grpc.WithInsecure()) if err != nil { log.Fatal("dialing err:", err) } defer conn.Close() // 建立gRPC连接 client := pb.NewHelloServiceClient(conn) // 创建发送结构体 req := pb.String{ Value: "stream server grpc ", } SayHello(client, &req) } 复制代码
在 Client 端,主要留意 stream.Recv()
方法,此方法,是对 ClientStream.RecvMsg
方法的封装,而 RecvMsg
方法会从流中读取完整的 gRPC 消息体,我们可得知:
- RecvMsg 是阻塞等待的。
- RecvMsg 当流成功/结束(调用了 Close)时,会返回 io.EOF。
- RecvMsg 当流出现任何错误时,流会被中止,错误信息会包含 RPC 错误码。而在 RecvMsg 中可能出现如下错误,例如:
- io.EOF、io.ErrUnexpectedEOF
- transport.ConnectionError
- google.golang.org/grpc/codes(gRPC 的预定义错误码) 需要注意的是,默认的 MaxReceiveMessageSize 值为 1024 1024 4,若有特别需求,可以适当调整。
开启服务器端,开启客户端。执行结果如下:
$ go run server.go 2021/11/16 21:57:18 net.Listing... 复制代码
$ go run client.go 2021/11/16 21:57:31 resp: value:"stream server grpc 0" 2021/11/16 21:57:31 resp: value:"stream server grpc 1" 2021/11/16 21:57:31 resp: value:"stream server grpc 2" 2021/11/16 21:57:31 resp: value:"stream server grpc 3" 2021/11/16 21:57:31 resp: value:"stream server grpc 4" 复制代码