gRPC三种流和消息格式(二)

简介: gRPC三种流和消息格式

gRPC三种流和消息格式(一)https://developer.aliyun.com/article/1392117


客户端代码

package main
import (
  "context"
  "flag"
  "io"
  "log"
  "time"
  "google.golang.org/grpc"
  "google.golang.org/grpc/credentials/insecure"
  pb "mygrpc/proto/hello" // 引入编译生成的包
)
const (
  defaultName = "world"
)
var (
  addr = flag.String("addr", "localhost:50051", "the address to connect to")
  name = flag.String("name", defaultName, "Name to greet")
)
func main() {
  flag.Parse()
  // 与服务建立连接.
  conn, err := grpc.Dial(*addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
  if err != nil {
    log.Fatalf("did not connect: %v", err)
  }
  defer func(conn *grpc.ClientConn) {
    err := conn.Close()
    if err != nil {
    }
  }(conn)
  // 创建指定服务的客户端
  c := pb.NewGreeterClient(conn)
  // 连接服务器并打印出其响应。
  ctx, cancel := context.WithTimeout(context.Background(), time.Second)
  defer cancel()
  // 调用指定方法
  searchStream, _ := c.SearchOrders(ctx, &pb.HelloRequest{Name: "开始服务端rpc流测试"})
  for {
    // 客户端 Recv 方法接收服务端发送的流
    searchOrder, err := searchStream.Recv()
    if err == io.EOF {
      log.Print("EOF")
      break
    }
    if err == nil {
      log.Print("Search Result : ", searchOrder)
    }
  }
  r, err := c.SayHello(ctx, &pb.HelloRequest{Name: *name})
  if err != nil {
    log.Fatalf("could not greet: %v", err)
  }
  log.Printf("Greeting: %s", r.GetMessage())
}

客户流RPC

客户端多个请求发给服务端,服务端发送一个响应给客户端,比如更新业务,客户端的读个请求发过来,服务端更新完返回一个成功的结果

在protobuf中的 service添加以下代码

rpc updateOrders(stream HelloRequest) returns (stream HelloReply);

服务端代码

func (s *server) UpdateOrders(stream pb.OrderManagement_UpdateOrdersServer) error {
  ordersStr := "Updated Order IDs : "
  for {
    // Recv 对客户端发来的请求接收
    order, err := stream.Recv()
    if err == io.EOF {
      // 流结束,关闭并发送响应给客户端
      return stream.SendAndClose(&wrapper.StringValue{Value: "Orders processed " + ordersStr})
    }
    if err != nil {
      return err
    }
    // 更新数据
    orderMap[order.Id] = *order
    log.Printf("Order ID : %s - %s", order.Id, "Updated")
    ordersStr += order.Id + ", "
  }
}

客户端代码

package main
import (
  "context"
  "flag"
  "log"
  "time"
  "google.golang.org/grpc"
  "google.golang.org/grpc/credentials/insecure"
  pb "mygrpc/proto/hello" // 引入编译生成的包
)
const (
  defaultName = "world"
)
var (
  addr = flag.String("addr", "localhost:50051", "the address to connect to")
  name = flag.String("name", defaultName, "Name to greet")
)
func main() {
  flag.Parse()
  // 与服务建立连接.
  conn, err := grpc.Dial(*addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
  if err != nil {
    log.Fatalf("did not connect: %v", err)
  }
  defer func(conn *grpc.ClientConn) {
    err := conn.Close()
    if err != nil {
    }
  }(conn)
  // 创建指定服务的客户端
  c := pb.NewGreeterClient(conn)
  // 连接服务器并打印出其响应。
  ctx, cancel := context.WithTimeout(context.Background(), time.Second)
  defer cancel()
  // 调用指定方法
  updateStream, err := c.UpdateOrders(ctx)
  if err != nil {
    log.Fatalf("%v.UpdateOrders(_) = _, %v", c, err)
  }
  // Updating order 1
  if err := updateStream.Send(&pb.HelloRequest{Name: "1"}); err != nil {
    log.Fatalf("%v.Send(%v) = %v", updateStream, &pb.HelloRequest{Name: "1"}, err)
  }
  // Updating order 2
  if err := updateStream.Send(&pb.HelloRequest{Name: "2"}); err != nil {
    log.Fatalf("%v.Send(%v) = %v", updateStream, &pb.HelloRequest{Name: "2"}, err)
  }
  // 发送关闭信号并接收服务端响应
  err = updateStream.CloseSend()
  if err != nil {
    log.Fatalf("%v.CloseAndRecv() got error %v, want %v", updateStream, err, nil)
  }
  log.Printf("客户端流传输结束")
}

双工流RPC

对应的业务就比如实时的消息流

protobuf

// 设置双工rpc
  rpc processOrders(stream HelloRequest) returns (stream HelloReply);

服务端

package main
import (
  "flag"
  "fmt"
  "google.golang.org/grpc"
  "io"
  "log"
  pb "mygrpc/proto/hello"
  "net"
  "sync"
)
var (
  port = flag.Int("port", 50051, "The service port")
)
type server struct {
  pb.UnimplementedGreeterServer
}
func (s *server) ProcessOrders(stream pb.Greeter_ProcessOrdersServer) error {
  var (
    waitGroup sync.WaitGroup // 一组 goroutine 的结束
    // 设置通道
    msgCh = make(chan *pb.HelloReply)
  )
  // 计数器加1
  waitGroup.Add(1)
  // 消费队列中的内容
  go func() {
    // 计数器减一
    defer waitGroup.Done()
    for {
      v := <-msgCh
      fmt.Println(v)
      err := stream.Send(v)
      if err != nil {
        fmt.Println("Send error:", err)
        break
      }
    }
  }()
  waitGroup.Add(1)
  // 向队列中添加内容
  go func() {
    defer waitGroup.Done()
    for {
      req, err := stream.Recv()
      if err == io.EOF {
        break
      }
      if err != nil {
        log.Fatalf("recv error:%v", err)
      }
      fmt.Printf("Recved :%v \n", req.GetName())
      msgCh <- &pb.HelloReply{Message: "服务端传输数据"}
    }
    close(msgCh)
  }()
  // 等待 计数器问0 推出
  waitGroup.Wait()
  // 返回nil表示已经完成响应
  return nil
}
func main() {
  flag.Parse()
  lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
  if err != nil {
    log.Fatalf("failed to listen: %v", err)
  }
  // 开启rpc
  s := grpc.NewServer()
  // 注册服务
  pb.RegisterGreeterServer(s, &server{})
  log.Printf("service listening at %v", lis.Addr())
  if err := s.Serve(lis); err != nil {
    log.Fatalf("failed to serve: %v", err)
  }
}

客户端

package main
import (
  "context"
  "flag"
  "fmt"
  "io"
  "log"
  "sync"
  "time"
  "google.golang.org/grpc"
  "google.golang.org/grpc/credentials/insecure"
  pb "mygrpc/proto/hello" // 引入编译生成的包
)
const (
  defaultName = "world"
)
var (
  addr = flag.String("addr", "localhost:50051", "the address to connect to")
  name = flag.String("name", defaultName, "Name to greet")
)
func main() {
  flag.Parse()
  // 与服务建立连接.
  conn, err := grpc.Dial(*addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
  if err != nil {
    log.Fatalf("did not connect: %v", err)
  }
  defer func(conn *grpc.ClientConn) {
    err := conn.Close()
    if err != nil {
    }
  }(conn)
  // 创建指定服务的客户端
  c := pb.NewGreeterClient(conn)
  // 连接服务器并打印出其响应。
  ctx, cancel := context.WithTimeout(context.Background(), time.Second)
  defer cancel()
  // 设置
  var wg sync.WaitGroup
  // 调用指定方法
  stream, _ := c.ProcessOrders(ctx)
  if err != nil {
    panic(err)
  }
  // 3.开两个goroutine 分别用于Recv()和Send()
  wg.Add(1)
  go func() {
    defer wg.Done()
    for {
      req, err := stream.Recv()
      if err == io.EOF {
        fmt.Println("Server Closed")
        break
      }
      if err != nil {
        continue
      }
      fmt.Printf("Recv Data:%v \n", req.GetMessage())
    }
  }()
  wg.Add(1)
  go func() {
    defer wg.Done()
    for i := 0; i < 2; i++ {
      err := stream.Send(&pb.HelloRequest{Name: "hello world"})
      if err != nil {
        log.Printf("send error:%v\n", err)
      }
    }
    // 4. 发送完毕关闭stream
    err := stream.CloseSend()
    if err != nil {
      log.Printf("Send error:%v\n", err)
      return
    }
  }()
  wg.Wait()
  log.Printf("服务端流传输结束")
}

代码仓库

https://github.com/onenewcode/mygrpc.git

也可以直接下载绑定的资源。

相关文章
|
3月前
|
消息中间件 安全 Java
构建基于RabbitMQ的安全消息传输管道
【8月更文第28天】在分布式系统中,消息队列如RabbitMQ为应用间的数据交换提供了可靠的支持。然而,随着数据的敏感性增加,确保这些消息的安全传输变得至关重要。本文将探讨如何在RabbitMQ中实施一系列安全措施,包括加密通信、认证和授权机制,以保护敏感信息。
66 1
|
3月前
|
编解码 Android开发
### 揭秘!如何在轻量级RTSP服务中玩转H.264扩展SEI,实现自定义数据的发送与接收?
【8月更文挑战第14天】本文介绍如何在轻量级RTSP服务中实现H.264的SEI功能,允许在视频流中嵌入自定义数据。首先确保环境已安装Android Studio并具备基础开发技能。接着,通过Gradle添加必要依赖如`jrtsp`。创建RTSP服务并配置H.264编码器支持SEI。编码过程中可添加自定义SEI数据,并在客户端解析这些数据。此方案适用于需在视频流中传递元数据的应用场景。
40 0
|
5月前
|
安全 C++
gRPC 四模式之 客户端流RPC模式
gRPC 四模式之 客户端流RPC模式
50 0
|
5月前
|
C++
gRPC 四模式之 服务器端流RPC模式
gRPC 四模式之 服务器端流RPC模式
117 0
|
6月前
gRPC三种流和消息格式(一)
gRPC三种流和消息格式
221 0
|
5月前
|
存储 C++
gRPC 四模式之 双向流RPC模式
gRPC 四模式之 双向流RPC模式
170 0
|
6月前
|
人工智能 Java
通过okhttp调用SSE流式接口,并将消息返回给客户端
通过okhttp调用SSE流式接口,并将消息返回给客户端
|
6月前
|
消息中间件 JSON 监控
Kafka 的消息格式:了解消息结构与序列化
Kafka 作为一款高性能的消息中间件系统,其消息格式对于消息的生产、传输和消费起着至关重要的作用。本篇博客将深入讨论 Kafka 的消息格式,包括消息的结构、序列化与反序列化,以及一些常用的消息格式选项。通过更丰富的示例代码和深入的解析,希望能够帮助大家更好地理解 Kafka 消息的内部机制。
|
6月前
|
网络协议 数据格式
HTTP鸡础(传输协议,特点,历史版本,请求消息数据格式)
HTTP鸡础(传输协议,特点,历史版本,请求消息数据格式)
|
网络协议 索引
HTTP/2 协议(帧、消息、流简单的抓包分析)
HTTP/2 协议(帧、消息、流简单的抓包分析)
602 0