网站首页 返回列表 像“草根”一样,紧贴着地面,低调的存在,冬去春来,枯荣无恙。

通过Nginx实现gRPC服务的负载均衡 | gRPC双向数据流的交互控制系列(3)

2020-06-10 04:10:02 admin 1254

前情提要

本系列的第一篇文章
通过一个例子介绍了go语言实现gRPC双向数据流的交互控制,第二篇文章介绍了如何通过Websocket与gRPC交互。通过这两篇文章,我们可以一窥gRPC双向数据流的开发方式,但是在生产环境当中一台服务器(一个服务端程序)是不够的,我们往往会面临各种复杂情况:访问量上来了一台服务器不够怎么办?服务器挂了怎么办?有实战经验的读者肯定知道答案:上
负载均衡 (Load Balancing)啊!

gRPC服务如何做负载均衡?

gRPC官方博客上有一篇文章《gRPC Load
Balancing》(https://grpc.io/blog/loadbalancing),详细介绍了几种方案,并分析了几种方案各自的优劣。并附了一张解决方案表:

gRPC负载均衡解决方案表

在gRPC的Github上还有一篇文章叫《Load Balancing in
gRPC》(https://github.com/grpc/grpc/blob/master/doc/load- balancing.md),如果英文看着费劲可以看一篇中文的《gRPC服务发现&负载均衡》(https://segmentfault.com/a/1190000008672912)。


测试Nginx对gRPC服务的支持

因为上面几篇文章介绍的很详细了,所以本文不再展开讨论。我们可以注意到上表中被红框圈起来的部分写着“Nginx coming
soon”,现在这个Nginx的解决方案已经来了——2018年3月17日,Nginx官方宣布nginx 1.13.10支持gRPC
(https://www.nginx.com/blog/nginx-1-13-10-grpc/)

第一步:下载nginx最新的stable版(本文发稿时是1.14.0,如果会用docker的也可以下载其alpine版本)。
第二步:配置nginx的config文件如下

__

  1. server {
  2. # ⚠️ nginx的监听端口按你的实际情况设置
  3. listen 80 http2;
  4. access_log /var/log/nginx/access.log main;
  5. location / {
  6. # ⚠️ 把下面的 grpc://127.0.0.1:3000换成你自己的grpc服务器地址
  7. grpc_pass grpc://127.0.0.1:3000;
  8. }
  9. }

第三步:把go语言实现gRPC双向数据流的交互控制
一文中的client.go 中的服务端地址改为nginx服务的地址(比如:127.0.0.1:80)

第四步:
(1)运行server.go
(2)运行nginx服务
(3)运行client.go

如果没什么意外,gRPC客户端发出的消息可以通过nginx后被gRPC服务端收到。

nginx日志

我们可以通过nginx日志观察到相应的信息。

一个小坑

上述连接虽然已经实现,但是如果我们的客户端有连续一分钟没有输入信息,会出现接收信息出错的情况。

连接被nginx断开

这种情形在没有使用nginx的时候不会出现,由于以前使用nginx给websocket做反向代理时也出现过类似情况,故而推断是nginx对超过一段时间的连接进行了断开。

添加心跳

解决上述问题可以采取的一个方法是增加心跳(如果您发现了什么别的好办法可以解决这个问题,比如在nginx里配置一些参数,请留言告诉我?)

client.go

添加一段隔40秒发送心跳的代码

__

  1. package main
  2. import (
  3. "bufio"
  4. "context"
  5. "flag"
  6. "io"
  7. "log"
  8. "os"
  9. "time"
  10. "google.golang.org/grpc"
  11. proto "chat" // 根据proto文件自动生成的代码
  12. )
  13. var 服务器地址 string
  14. func init() {
  15. flag.StringVar(&服务器地址, "server", "127.0.0.1:80", "服务器地址")
  16. }
  17. func main() {
  18. // 创建连接
  19. conn, err := grpc.Dial(服务器地址, grpc.WithInsecure())
  20. if err != nil {
  21. log.Printf("连接失败: [%v]\n", err)
  22. return
  23. }
  24. defer conn.Close()
  25. client := proto.NewChatClient(conn)
  26. // 声明 context
  27. ctx := context.Background()
  28. // 创建双向数据流
  29. stream, err := client.BidStream(ctx)
  30. if err != nil {
  31. log.Printf("创建数据流失败: [%v]\n", err)
  32. return
  33. }
  34. // 启动一个 goroutine 接收命令行输入的指令
  35. go func() {
  36. log.Println("请输入消息...")
  37. 输入 := bufio.NewReader(os.Stdin)
  38. for {
  39. // 获取 命令行输入的字符串, 以回车 \n 作为结束标志
  40. 命令行输入的字符串, _ := 输入.ReadString('\n')
  41. // 向服务端发送 指令
  42. if err := stream.Send(&proto.Request{Input: 命令行输入的字符串}); err != nil {
  43. return
  44. }
  45. }
  46. }()
  47. //⚠️ 新添加的部分: 启动一个 goroutine 每隔40秒发送心跳包
  48. go func() {
  49. for {
  50. // 每隔 40 秒发送一次
  51. time.Sleep(40 * time.Second)
  52. log.Println("发送心跳包")
  53. // 心跳字符用"\n"
  54. if err := stream.Send(&proto.Request{Input: "\n"}); err != nil {
  55. return
  56. }
  57. }
  58. }()
  59. for {
  60. // 接收从 服务端返回的数据流
  61. 响应, err := stream.Recv()
  62. if err == io.EOF {
  63. log.Println("⚠️ 收到服务端的结束信号")
  64. break
  65. }
  66. if err != nil {
  67. // TODO: 处理接收错误
  68. log.Println("接收数据出错:", err)
  69. break
  70. }
  71. log.Printf("[客户端收到]: %s", 响应.Output)
  72. }
  73. }

server.go

添加一段检测心跳的代码

__

  1. package main
  2. import (
  3. "flag"
  4. "io"
  5. "log"
  6. "net"
  7. "strconv"
  8. "google.golang.org/grpc"
  9. proto "chat" // 根据proto文件自动生成的代码
  10. )
  11. // Streamer 服务端
  12. type Streamer struct{}
  13. // BidStream 实现了 ChatServer 接口中定义的 BidStream 方法
  14. func (s *Streamer) BidStream(stream proto.Chat_BidStreamServer) error {
  15. ctx := stream.Context()
  16. for {
  17. select {
  18. case <-ctx.Done():
  19. log.Println("收到客户端通过context发出的终止信号")
  20. return ctx.Err()
  21. default:
  22. // 接收从客户端发来的消息
  23. 输入, err := stream.Recv()
  24. if err == io.EOF {
  25. log.Println("客户端发送的数据流结束")
  26. return nil
  27. }
  28. if err != nil {
  29. log.Println("接收数据出错:", err)
  30. return err
  31. }
  32. // 如果接收正常,则根据接收到的 字符串 执行相应的指令
  33. switch 输入.Input {
  34. case "结束对话\n", "结束对话":
  35. log.Println("收到'结束对话'指令")
  36. if err := stream.Send(&proto.Response{Output: "收到结束指令"}); err != nil {
  37. return err
  38. }
  39. // 收到结束指令时,通过 return nil 终止双向数据流
  40. return nil
  41. case "返回数据流\n", "返回数据流":
  42. log.Println("收到'返回数据流'指令")
  43. // 收到 收到'返回数据流'指令, 连续返回 10 条数据
  44. for i := 0; i < 10; i++ {
  45. if err := stream.Send(&proto.Response{Output: "数据流 #" + strconv.Itoa(i)}); err != nil {
  46. return err
  47. }
  48. }
  49. // ⚠️ 拦截心跳字符"\n"
  50. case "\n":
  51. log.Println("收到心跳包")
  52. // 只接收心跳不回发数据也可以
  53. default:
  54. // 缺省情况下, 返回 '服务端返回: ' + 输入信息
  55. log.Printf("[收到消息]: %s", 输入.Input)
  56. if err := stream.Send(&proto.Response{Output: "服务端返回: " + 输入.Input}); err != nil {
  57. return err
  58. }
  59. }
  60. }
  61. }
  62. }
  63. var 服务端口 string
  64. func init() {
  65. flag.StringVar(&服务端口, "port", "3000", "服务端口")
  66. }
  67. func main() {
  68. log.Println("启动服务端...")
  69. server := grpc.NewServer()
  70. // 注册 ChatServer
  71. proto.RegisterChatServer(server, &Streamer{})
  72. address, err := net.Listen("tcp", ":"+服务端口)
  73. if err != nil {
  74. panic(err)
  75. }
  76. if err := server.Serve(address); err != nil {
  77. panic(err)
  78. }
  79. }

添加完成后再度测试,连接不会再被nginx打断。


Nginx实现服务端负载均衡的配置文件

心跳的坑趟过去之后,剩下的其实就简单了,我们修改nginx的配置文件:

__

  1. upstream backend {
  2. # ⚠️ 把下面的服务端地址和端口改成你自己的
  3. server 127.0.0.1:3000;
  4. server 127.0.0.1:3001;
  5. }
  6. server {
  7. listen 80 http2;
  8. access_log /var/log/nginx/access.log main;
  9. location / {
  10. grpc_pass grpc://backend;
  11. }
  12. }

按如下顺序启动
(1)运行 多个 server.go ,按照nginx配置文件输入端口参数(如 server.go -port 3001)

(2)运行nginx服务

(3)运行 多个 client.go, (也可以运行websocket的那个程序, 记得把心跳代码加上 ,多开几个浏览器窗口)

我们可以观察到开启的多个server都在进行gRPC数据流服务,至此大功告成?!


总结

gRPC服务端的负载均衡有很多种方案,也各有优劣,但是用Nginx似乎是最简单的一种。总之,我们还得根据具体的业务场景来选择具体的实现方案。


gRPC双向数据流系列

(之一): gRPC双向数据流的交互控制
(之二): 通过Websocket与gRPC交互

关键字词nginxgrpc

分享到:

如需留言,请 登录,没有账号?请 注册

0 条评论 0 人参与

顶部 底部