gRPC三种流和消息格式(一)

简介: gRPC三种流和消息格式

消息格式

RPC流

服务端实现protocol buffer定义的方法,客户端保留一个存根,提供服务端方法的抽象,客户端只需要调用存根中的方法,就可以远程调用服务端方法。

  • 调用存根方法
  • 存根创建HTTP POST请求(gRPC中所有请求都是 POST),设置content-typeapplication/grpc
  • 到达服务端,会先检查请求头是不是gRPC请求,否则返回415

长度前缀的消息分帧

在写入消息前,先写入长度消息表明每条消息的大小。

每条消息都有额外的4字节来设置大小,也就是说消息的大小不能超过4GB

帧首中还有单字节无符号整数,用来表明数据是否进行了压缩

为1表示使用 message-encoding中的编码机制进行了压缩

请求消息

客户端发送,包含3个部分:请求头信息、长度前缀的消息、流结束标记

1、对于gRPC 都是POST

2、协议:Http/Https

3、/服务名/方法名

4、目标URI的主机名

5、对不兼容代理的检测,gRPC下这个值必须为 trailers

6、超时时间

7、媒体类型

8、压缩类型

当因为没有要发送的数据而需要关闭请求流时,必须发送一个带标记的空数据帧

响应信息

服务端发送,包含3个部分:响应头信息、长度前缀的信息、trailers

END_STREAM 标记不会随数据帧一起发送,而是作为单独的头信息来发送,名为 trailer

三种流

一元RPC

通信时始终只有一个请求和一个响应

protocol buffer

syntax = "proto3";
package hello;
// 第一个分割参数,输出路径;第二个设置生成类的包路径
option go_package = "./proto/hello";
// 设置服务名称
service Greeter {
  // 设置方法
  rpc SayHello (HelloRequest) returns (HelloReply) {}
}
// 请求信息用户名.
message HelloRequest {
  string name = 1;
}
// 响应信息
message HelloReply {
  string message = 1;
}

服务端

package main
import (
  "context"
  "flag"
  "fmt"
  "log"
  "net"
  "google.golang.org/grpc"
  pb "mygrpc/proto/hello"
)
var (
  port = flag.Int("port", 50051, "The server port")
)
type server struct {
  pb.UnimplementedGreeterServer
}
func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
  log.Printf("Received: %v", in.GetName())
  return &pb.HelloReply{Message: "Hello " + in.GetName()}, nil
}
func main() {
  flag.Parse()
  lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
  if err != nil {
    log.Fatalf("failed to listen: %v", err)
  }
  // 开启rpc
  s := grpc.NewServer()
  // 注册服务
  pb.RegisterGreeterServer(s, &server{})
  log.Printf("server listening at %v", lis.Addr())
  if err := s.Serve(lis); err != nil {
    log.Fatalf("failed to serve: %v", err)
  }
}

客户端

package main
import (
  "context"
  "flag"
  "log"
  "time"
  "google.golang.org/grpc"
  "google.golang.org/grpc/credentials/insecure"
  pb "mygrpc/proto/hello" // 引入编译生成的包
)
const (
  defaultName = "world"
)
var (
  addr = flag.String("addr", "localhost:50051", "the address to connect to")
  name = flag.String("name", defaultName, "Name to greet")
)
func main() {
  flag.Parse()
  // 与服务建立连接.
  conn, err := grpc.Dial(*addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
  if err != nil {
    log.Fatalf("did not connect: %v", err)
  }
  defer conn.Close()
  // 创建指定服务的客户端
  c := pb.NewGreeterClient(conn)
  // 连接服务器并打印出其响应。
  ctx, cancel := context.WithTimeout(context.Background(), time.Second)
  defer cancel()
  // 调用指定方法
  r, err := c.SayHello(ctx, &pb.HelloRequest{Name: *name})
  if err != nil {
    log.Fatalf("could not greet: %v", err)
  }
  log.Printf("Greeting: %s", r.GetMessage())
}

服务流RPC

通信时可以是一个请求,服务端多次响应,比如查询业务,服务端模糊匹配找到一次就返回客户端一次响应这样的多次响应。

在protobuf中的 service添加以下代码

rpc searchOrders(google.protobuf.StringValue) returns (stream Order);

服务端代码

package main
import (
  "context"
  "flag"
  "fmt"
  "google.golang.org/grpc"
  "io"
  "log"
  pb "mygrpc/proto/hello"
  "net"
)
var (
  port = flag.Int("port", 50051, "The service port")
)
type server struct {
  pb.UnimplementedGreeterServer
}
func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
  log.Printf("Received: %v", in.GetName())
  return &pb.HelloReply{Message: "Hello " + in.GetName()}, nil
}
func (s *server) SearchOrders(req *pb.HelloRequest, stream pb.Greeter_SearchOrdersServer) error {
  log.Printf("Recved %v", req.GetName())
  // 具体返回多少个response根据业务逻辑调整
  for i := 0; i < 10; i++ {
    // 通过 send 方法不断推送数据
    err := stream.Send(&pb.HelloReply{})
    if err != nil {
      log.Fatalf("Send error:%v", err)
      return err
    }
  }
  return nil
}
func (s *server) UpdateOrders(stream pb.Greeter_UpdateOrdersServer) error {
  for {
    log.Println("开始接受客户端的流")
    // Recv 对客户端发来的请求接收
    order, err := stream.Recv()
    if err == io.EOF {
      // 流结束,关闭并发送响应给客户端
      return stream.Send(&pb.HelloReply{Message: "接受客户流结束"})
    }
    if err != nil {
      return err
    }
    // 更新数据
    log.Printf("Order ID : %s - %s", order.GetName(), "Updated")
  }
}
func main() {
  flag.Parse()
  lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
  if err != nil {
    log.Fatalf("failed to listen: %v", err)
  }
  // 开启rpc
  s := grpc.NewServer()
  // 注册服务
  pb.RegisterGreeterServer(s, &server{})
  log.Printf("service listening at %v", lis.Addr())
  if err := s.Serve(lis); err != nil {
    log.Fatalf("failed to serve: %v", err)
  }
}


gRPC三种流和消息格式(二)https://developer.aliyun.com/article/1392118

相关文章
|
16天前
|
编解码 Android开发 开发者
如何在轻量级RTSP服务中玩转H.264扩展SEI,实现自定义数据的发送与接收?
【9月更文挑战第4天】本文详细介绍了如何在轻量级RTSP服务中实现H.264标准的扩展SEI功能,包括环境准备、依赖引入、RTSP服务创建、自定义SEI数据发送与接收等步骤,并提供了具体代码示例,帮助开发者更好地利用SEI在视频流中嵌入元数据。
30 2
|
23天前
|
消息中间件 安全 Java
构建基于RabbitMQ的安全消息传输管道
【8月更文第28天】在分布式系统中,消息队列如RabbitMQ为应用间的数据交换提供了可靠的支持。然而,随着数据的敏感性增加,确保这些消息的安全传输变得至关重要。本文将探讨如何在RabbitMQ中实施一系列安全措施,包括加密通信、认证和授权机制,以保护敏感信息。
26 1
|
1月前
|
编解码 Android开发
### 揭秘!如何在轻量级RTSP服务中玩转H.264扩展SEI,实现自定义数据的发送与接收?
【8月更文挑战第14天】本文介绍如何在轻量级RTSP服务中实现H.264的SEI功能,允许在视频流中嵌入自定义数据。首先确保环境已安装Android Studio并具备基础开发技能。接着,通过Gradle添加必要依赖如`jrtsp`。创建RTSP服务并配置H.264编码器支持SEI。编码过程中可添加自定义SEI数据,并在客户端解析这些数据。此方案适用于需在视频流中传递元数据的应用场景。
30 0
|
3月前
|
安全 C++
gRPC 四模式之 客户端流RPC模式
gRPC 四模式之 客户端流RPC模式
40 0
|
3月前
|
C++
gRPC 四模式之 服务器端流RPC模式
gRPC 四模式之 服务器端流RPC模式
73 0
|
3月前
|
存储 C++
gRPC 四模式之 双向流RPC模式
gRPC 四模式之 双向流RPC模式
93 0
|
4月前
|
人工智能 Java
通过okhttp调用SSE流式接口,并将消息返回给客户端
通过okhttp调用SSE流式接口,并将消息返回给客户端
|
4月前
|
消息中间件 JSON 监控
Kafka 的消息格式:了解消息结构与序列化
Kafka 作为一款高性能的消息中间件系统,其消息格式对于消息的生产、传输和消费起着至关重要的作用。本篇博客将深入讨论 Kafka 的消息格式,包括消息的结构、序列化与反序列化,以及一些常用的消息格式选项。通过更丰富的示例代码和深入的解析,希望能够帮助大家更好地理解 Kafka 消息的内部机制。
|
4月前
gRPC三种流和消息格式(二)
gRPC三种流和消息格式
80 0
|
编解码 Linux 定位技术
如何在轻量级RTSP服务支持H.264扩展SEI发送接收自定义数据?
如何在轻量级RTSP服务支持H.264扩展SEI发送接收自定义数据?
145 0