前面的两篇文章已经基本讲述了 gRPC 的使用方法,相信已经能够应对大多数开发场景了,只不过我们有时候需要针对业务作出一些定制,这时候就必须要更加了解 gRPC 的一些特性了,比如:
- 拦截器
- 截止时间
- 取消
拦截器
我把前一篇文章的订单 Order 服务的代码文件拷贝了一份,用来演示这一节的内容。
gRPC 在服务端和客户端均可以实现拦截器,首先来看一下服务端的拦截器。
服务端拦截器
针对一元通信模式,有对应的一元拦截器,拦截器的方法定义如下:
// 一元拦截器 func orderUnaryServerInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (res interface{}, err error) { //前置处理 log.Println("==========[Server Unary Interceptor]===========", info.FullMethod) //完成方法的正常执行 res, err = handler(ctx, req) //后置处理 log.Printf("After method call, res = %+v\n", res) return }
然后需要在服务端的 main 方法中注册一下这个拦截器:
// ... // 注册拦截器 s := grpc.NewServer(grpc.UnaryInterceptor(orderUnaryServerInterceptor)) // ...
实现完成之后,所有服务端的一元通信模式的方法,都会被这个方法所拦截。
针对服务端流模式,也有对应的流拦截器,方法的定义稍微有一些区别了。
// 服务端流拦截器 type WrappedServerStream struct { grpc.ServerStream } func (w *WrappedServerStream) SendMsg(m interface{}) error { log.Printf("[order stream server interceptor] send a msg : %+v", m) return w.ServerStream.SendMsg(m) } func (w *WrappedServerStream) RecvMsg(m interface{}) error { log.Printf("[order stream server interceptor] recv a msg : %+v", m) return w.ServerStream.RecvMsg(m) } func NewWrappedServerStream(s grpc.ServerStream) *WrappedServerStream { return &WrappedServerStream{s} } func orderStreamServerInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { log.Printf("=========[order stream]start %s\n", info.FullMethod) //执行方法 err := handler(srv, NewWrappedServerStream(ss)) if err != nil { log.Println("handle method err.", err) } log.Printf("=========[order stream]end") return nil }
在接收和发送消息时,都可以进行拦截,这样可以根据业务所需定制化。
在服务端的 main 方法中,还是需要注册一下这个拦截器:
// 注册拦截器 s := grpc.NewServer( grpc.UnaryInterceptor(orderUnaryServerInterceptor), grpc.StreamInterceptor(orderStreamServerInterceptor), )
客户端拦截器
和服务端类似,客户端拦截器也分为了一元拦截器和流拦截器,分别对应不同的通信模式。
首先来看看一元拦截器。
//客户端一元拦截器 func UnaryClientOrderInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) (err error) { log.Println("=========[client interceptor] ", method) err = invoker(ctx, method, req, reply, cc, opts...) if err != nil { log.Println("invoke method err.", err) } log.Println("=========[client interceptor] end. reply : ", reply) return }
然后也需要在监听服务端的时候注册拦截器:
// ... conn, err := grpc.Dial(address, grpc.WithInsecure(), grpc.WithUnaryInterceptor(UnaryClientOrderInterceptor)) // ...
然后再来看下客户端流拦截器,先看下拦截器方法定义:
// 客户端流拦截器 type WrappedClientStream struct { grpc.ClientStream } func (w *WrappedClientStream) SendMsg(m interface{}) error { log.Printf("===========[client interceptor] send msg : %+v", m) return w.ClientStream.SendMsg(m) } func (w *WrappedClientStream) RecvMsg(m interface{}) error { log.Printf("============[client interceptor] recv msg : %+v", m) return w.ClientStream.RecvMsg(m) } func NewWrappedClientStream(s grpc.ClientStream) *WrappedClientStream { return &WrappedClientStream{s} } func StreamClientOrderInterceptor (ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { log.Printf("===========[client msg]start, method = %+v\n", method) clientStream, err := streamer(ctx, desc, cc, method, opts...) if err != nil { return nil, err } return NewWrappedClientStream(clientStream), nil }
最后也需要在客户端的 main 方法中注册拦截器:
conn, err := grpc.Dial(address, grpc.WithInsecure(), grpc.WithUnaryInterceptor(UnaryClientOrderInterceptor), //注册拦截器 grpc.WithStreamInterceptor(StreamClientOrderInterceptor), )
截止时间
再来看一下另一个常用的模式截止时间,gRPC 常用于微服务之间的通信,而微服务中,服务之间的调用往往存在不确定性,比如网络环境差,数据聚合量太大等等,都有可能会导致服务调用者长时间等待响应结果。
这样对于用户体验来说非常的不好,并且也很浪费资源,所以微服务之间需要采用快速失败的模式,及时释放资源,不堆积请求,这样对系统的压力也会更小。
在 Go 语言中,对 gRPC 截止时间的控制,主要是使用 context 来实现的。下面是一个简单的 demo:
// 使用带有截止时间的context ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(5 * time.Second)) //适当调整截止时间观察不同的调用效果 defer cancel() client := order.NewOrderManagementClient(conn) AddOrder(ctx, client)
这里使用了带有截止时间的 context,如果 AddOrder
方法的调用超过了截止时间,那么调用就会被取消。
取消
在某些情况下,我们可能需要取消 RPC 请求,这和设置截止时间有些类似,都是为了避免请求挂起让客户端一直等待,在 Go 语言中,取消的操作仍然借助于 context 来实现。
以一个简单的例子来说明取消 RPC 的操作:
// 取消RPC请求 func cancelRpcRequest(client order.OrderManagementClient) { ctx, cancelFunc := context.WithCancel(context.Background()) done := make(chan string) go func() { var id string defer func() { fmt.Println("结束执行, id = ", id) done <- id }() time.Sleep(2 * time.Second) id = AddOrder(ctx, client) log.Println("添加订单成功, id = ", id) }() //等待一秒后取消 time.Sleep(time.Second) cancelFunc() <-done }