最简单的 gRPC 教程—3 拦截器、截止、取消

简介: 前面的两篇文章已经基本讲述了 gRPC 的使用方法,相信已经能够应对大多数开发场景了,只不过我们有时候需要针对业务作出一些定制,这时候就必须要更加了解 gRPC 的一些特性了,比如:• 拦截器• 截止时间• 取消

前面的两篇文章已经基本讲述了 gRPC 的使用方法,相信已经能够应对大多数开发场景了,只不过我们有时候需要针对业务作出一些定制,这时候就必须要更加了解 gRPC 的一些特性了,比如:

  • 拦截器
  • 截止时间
  • 取消


拦截器


我把前一篇文章的订单 Order 服务的代码文件拷贝了一份,用来演示这一节的内容。

gRPC 在服务端和客户端均可以实现拦截器,首先来看一下服务端的拦截器。


服务端拦截器

针对一元通信模式,有对应的一元拦截器,拦截器的方法定义如下:

// 一元拦截器
func orderUnaryServerInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (res interface{}, err error) {
   //前置处理
   log.Println("==========[Server Unary Interceptor]===========", info.FullMethod)
   //完成方法的正常执行
   res, err = handler(ctx, req)
   //后置处理
   log.Printf("After method call, res = %+v\n", res)
   return
}


然后需要在服务端的 main 方法中注册一下这个拦截器:

// ...
// 注册拦截器
s := grpc.NewServer(grpc.UnaryInterceptor(orderUnaryServerInterceptor))
// ...


实现完成之后,所有服务端的一元通信模式的方法,都会被这个方法所拦截。

针对服务端流模式,也有对应的流拦截器,方法的定义稍微有一些区别了。

// 服务端流拦截器
type WrappedServerStream struct {
   grpc.ServerStream
}
func (w *WrappedServerStream) SendMsg(m interface{}) error {
   log.Printf("[order stream server interceptor] send a msg : %+v", m)
   return w.ServerStream.SendMsg(m)
}
func (w *WrappedServerStream) RecvMsg(m interface{}) error {
   log.Printf("[order stream server interceptor] recv a msg : %+v", m)
   return w.ServerStream.RecvMsg(m)
}
func NewWrappedServerStream(s grpc.ServerStream) *WrappedServerStream {
   return &WrappedServerStream{s}
}
func orderStreamServerInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
   log.Printf("=========[order stream]start %s\n", info.FullMethod)
   //执行方法
   err := handler(srv, NewWrappedServerStream(ss))
   if err != nil {
      log.Println("handle method err.", err)
   }
   log.Printf("=========[order stream]end")
   return nil
}


在接收和发送消息时,都可以进行拦截,这样可以根据业务所需定制化。

在服务端的 main 方法中,还是需要注册一下这个拦截器:

// 注册拦截器
s := grpc.NewServer(
   grpc.UnaryInterceptor(orderUnaryServerInterceptor),
   grpc.StreamInterceptor(orderStreamServerInterceptor),
)


客户端拦截器

和服务端类似,客户端拦截器也分为了一元拦截器和流拦截器,分别对应不同的通信模式。


首先来看看一元拦截器。

//客户端一元拦截器
func UnaryClientOrderInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) (err error) {
   log.Println("=========[client interceptor] ",  method)
   err = invoker(ctx, method, req, reply, cc, opts...)
   if err != nil {
      log.Println("invoke method err.", err)
   }
   log.Println("=========[client interceptor] end. reply : ", reply)
   return
}


然后也需要在监听服务端的时候注册拦截器:

// ...
conn, err := grpc.Dial(address, grpc.WithInsecure(), grpc.WithUnaryInterceptor(UnaryClientOrderInterceptor))
// ...


然后再来看下客户端流拦截器,先看下拦截器方法定义:

// 客户端流拦截器
type WrappedClientStream struct {
   grpc.ClientStream
}
func (w *WrappedClientStream) SendMsg(m interface{}) error {
   log.Printf("===========[client interceptor] send msg : %+v", m)
   return w.ClientStream.SendMsg(m)
}
func (w *WrappedClientStream) RecvMsg(m interface{}) error {
   log.Printf("============[client interceptor] recv msg : %+v", m)
   return w.ClientStream.RecvMsg(m)
}
func NewWrappedClientStream(s grpc.ClientStream) *WrappedClientStream {
   return &WrappedClientStream{s}
}
func StreamClientOrderInterceptor (ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
   log.Printf("===========[client msg]start, method = %+v\n", method)
   clientStream, err := streamer(ctx, desc, cc, method, opts...)
   if err != nil {
      return nil, err
   }
   return NewWrappedClientStream(clientStream), nil
}


最后也需要在客户端的 main 方法中注册拦截器:

conn, err := grpc.Dial(address, 
   grpc.WithInsecure(),
   grpc.WithUnaryInterceptor(UnaryClientOrderInterceptor),    //注册拦截器
   grpc.WithStreamInterceptor(StreamClientOrderInterceptor),
)


截止时间

再来看一下另一个常用的模式截止时间,gRPC 常用于微服务之间的通信,而微服务中,服务之间的调用往往存在不确定性,比如网络环境差,数据聚合量太大等等,都有可能会导致服务调用者长时间等待响应结果。

这样对于用户体验来说非常的不好,并且也很浪费资源,所以微服务之间需要采用快速失败的模式,及时释放资源,不堆积请求,这样对系统的压力也会更小。

在 Go 语言中,对 gRPC 截止时间的控制,主要是使用 context 来实现的。下面是一个简单的 demo:

// 使用带有截止时间的context
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(5 * time.Second)) //适当调整截止时间观察不同的调用效果
defer cancel()
client := order.NewOrderManagementClient(conn)
AddOrder(ctx, client)

这里使用了带有截止时间的 context,如果 AddOrder 方法的调用超过了截止时间,那么调用就会被取消。


取消

在某些情况下,我们可能需要取消 RPC 请求,这和设置截止时间有些类似,都是为了避免请求挂起让客户端一直等待,在 Go 语言中,取消的操作仍然借助于 context 来实现。

以一个简单的例子来说明取消 RPC 的操作:

// 取消RPC请求
func cancelRpcRequest(client order.OrderManagementClient) {
   ctx, cancelFunc := context.WithCancel(context.Background())
   done := make(chan string)
   go func() {
      var id string
      defer func() {
         fmt.Println("结束执行, id = ", id)
         done <- id
      }()
      time.Sleep(2 * time.Second)
      id = AddOrder(ctx, client)
      log.Println("添加订单成功, id = ", id)
   }()
   //等待一秒后取消
   time.Sleep(time.Second)
   cancelFunc()
   <-done
}


相关文章
|
4月前
|
前端开发 JavaScript UED
axios取消请求CancelToken的原理解析及用法示例
axios取消请求CancelToken的原理解析及用法示例
254 0
|
6月前
|
JavaScript 前端开发 API
【Azure 应用服务】Azure Function HTTP 触发后, 230秒就超时。而其他方式触发的Function, 执行5分钟后也超时,如何调整超时时间?
【Azure 应用服务】Azure Function HTTP 触发后, 230秒就超时。而其他方式触发的Function, 执行5分钟后也超时,如何调整超时时间?
|
7月前
|
负载均衡 Java API
通用快照方案问题之Feign添加请求拦截器如何解决
通用快照方案问题之Feign添加请求拦截器如何解决
48 1
|
中间件 程序员 Cloud Native
瞧一瞧 gRPC的拦截器 | 周末学习
上一次说到gRPC的认证总共有4种,其中介绍了常用且重要的2种
|
消息中间件 缓存 算法
Springboot----项目整合微信支付(引入延迟队列实现订单过期取消以及商户主动查单)
介绍了如何使用RabbitMQ实现订单过期自动取消以及如何采用RabbitMQ实现商户主动向微信支付后台查询订单状态,一石二鸟。
578 0
Springboot----项目整合微信支付(引入延迟队列实现订单过期取消以及商户主动查单)
|
前端开发 中间件 应用服务中间件
Laravel 请求周期详解
Laravel 8 通过引入 Laravel Jetstream,模型工厂类,迁移压缩,队列批处理,改善速率限制,队列改进,动态 Blade 组件,Tailwind 分页视图, 时间测试助手,artisan serve 的改进,事件监听器的改进,以及各种其他错误修复和可用性改进,对 Laravel 7.x 继续进行了改善。
143 0
|
SQL Go 数据库
SwiftUI 开源项目 - ZYSwiftUIFrame 自带服务端的完整示例项目(更新中...)
这是对我之前学习 SwiftUI 开发出的项目的功能骨架抽取
611 0
SwiftUI 开源项目 - ZYSwiftUIFrame 自带服务端的完整示例项目(更新中...)
|
API
【Nest教程】自定义拦截器处理处理响应数据
【Nest教程】自定义拦截器处理处理响应数据
516 0
【Nest教程】自定义拦截器处理处理响应数据