17

grpc 中的 stream(服务中的长连接,订阅,上传大数据等)

 4 years ago
source link: https://studygolang.com/articles/28064
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

前言:

之前我们讲了 grpc 怎么简单的使用 ,这次讲讲 grpc 中的 stream,srteam 顾名思义 就是 一种 流,可以源源不断的 推送 数据,很适合 传输一些大数据,或者 服务端 和 客户端 长时间 数据交互,比如 客户端 可以向 服务端 订阅 一个数据,服务端 就 可以利用 stream ,源源不断地 推送数据。

stream的种类:

客户端推送 服务端 rpc GetStream (StreamReqData) returns (stream StreamResData){}

服务端推送 客户端 rpc PutStream (stream StreamReqData) returns (StreamResData){}

客户端与 服务端 互相 推送 rpc AllStream (stream StreamReqData) returns (stream StreamResData){}

其实这个流 已经 基本退化成 tcp了,grpc 底层为我们 分包了,所以真的很方便。

protobuf的定义:

syntax = "proto3";//声明proto的版本 只能 是3,才支持 grpc

//声明 包名

package pro;

//声明grpc服务

service Greeter {

/*

以下 分别是 服务端 推送流, 客户端 推送流 ,双向流。

*/

rpc GetStream (StreamReqData) returns (stream StreamResData){}

rpc PutStream (stream StreamReqData) returns (StreamResData){}

rpc AllStream (stream StreamReqData) returns (stream StreamResData){}

}

//stream请求结构

message StreamReqData {

string data = 1;

}

//stream返回结构

message StreamResData {

string data = 1;

}

我们在 protobuf 里面 定义 要提供的服务,如果 你想把哪个数据 源源不断的 推送 就在前面加个stream 就好了,定义好记得编译。

服务端的实现:

package main

import (

"context"

"fmt"

"google.golang.org/grpc"

"grpc/pro"

"log"

"net"

"sync"

"time"

)

const PORT = ":50051"

type server struct {

}

//服务端 单向流

func (s *server)GetStream(req

pro.StreamReqData, res pro.Greeter_GetStreamServer) error{

i:= 0

for{

i++

res.Send(&pro.StreamResData{Data:fmt.Sprintf("%v",time.Now().Unix())})

time.Second)

if i >10 {

break

}

}

return nil

}

//客户端 单向流

func (this *server) PutStream(cliStr pro.Greeter_PutStreamServer) error {

for {
    if tem, err := cliStr.Recv(); err == nil {
        log.Println(tem)
    } else {
        log.Println("break, err :", err)
        break
    }
}

return nil

}

//客户端服务端 双向流

func(this *server) AllStream(allStr pro.Greeter_AllStreamServer) error {

wg := sync.WaitGroup{}
wg.Add(2)
go func() {
    for {
        data, _ := allStr.Recv()
        log.Println(data)
    }
    wg.Done()
}()

go func() {
    for {
        allStr.Send(&pro.StreamResData{Data:"ssss"})
        time.Sleep(time.Second)
    }
    wg.Done()
}()

wg.Wait()
return nil

}

func main(){

//监听端口

lis,err := net.Listen("tcp",PORT)

if err != nil{

return

}

//创建一个grpc 服务器

s := grpc.NewServer()

//注册事件

pro.RegisterGreeterServer(s,&server{})

//处理链接

s.Serve(lis)

}

知识点:

每个函数都对应着 完成了 protobuf 里面的 定义。

每个函数 形参都有对应的 推送 或者 接收 对象,我们只要 不断循环 Recv(),或者 Send() 就能接收或者推送了!

当return出函数,就说明此次 推送 或者 接收 结束了,client 会 对应的 收到消息!

客户端调用:

package main

import (

"google.golang.org/grpc"

"grpc/pro"
"log"
"context"
"time"
_ "google.golang.org/grpc/balancer/grpclb"

)

const (

ADDRESS = "localhost:50051"

)

func main(){

//通过grpc 库 建立一个连接

conn ,err := grpc.Dial(ADDRESS,grpc.WithInsecure())

if err != nil{

return

}

defer conn.Close()

//通过刚刚的连接 生成一个client对象。

c := pro.NewGreeterClient(conn)

//调用服务端推送流

reqstreamData := &pro.StreamReqData{Data:"aaa"}

res,_ := c.GetStream(context.Background(),reqstreamData)

for {

aa,err := res.Recv()

if err != nil {

log.Println(err)

break

}

log.Println(aa)

}

//客户端 推送 流

putRes, _ := c.PutStream(context.Background())

i := 1

for {

i++

putRes.Send(&pro.StreamReqData{Data:"ss"})

time.Sleep(time.Second)

if i > 10 {

break

}

}

//服务端 客户端 双向流

allStr,_ := c.AllStream(context.Background())

go func() {

for {

data,_ := allStr.Recv()

log.Println(data)

}

}()

go func() {
    for {
        allStr.Send(&pro.StreamReqData{Data:"ssss"})
        time.Sleep(time.Second)
    }
}()

select {
}

}

client 调用 流的函数, 就会 返回一个 流对象,只要 不断地 对它进行读取或者写入,对应方就能收到。

总结:

grpc 的 stream 和 go的协程 配合 简直完美。通过流 我们 可以更加 灵活的 实现自己的业务。如 订阅,大数据传输等。

欢迎关注我们的微信公众号,每天学习Go知识

FveQFjN.jpg!web

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK