zl程序教程

您现在的位置是:首页 >  其他

当前栏目

golang 源码分析:json格式请求grpc服务的

2023-02-18 16:32:25 时间

gRPC payload 的默认格式是 Protobuf,但是 gRPC-Go 的实现中也对外暴露了 Codec interface ,它支持任意的 payload 编码。我们可以使用任何一种格式,包括你自己定义的二进制格式、flatbuffers、或者JSON 格式。

通过google.golang.org/grpc@v1.50.1/encoding/encoding.go 的注册方法:

func RegisterCodec(codec Codec) {
  if codec == nil {
    panic("cannot register a nil Codec")
  }
  if codec.Name() == "" {
    panic("cannot register Codec with empty string result for Name()")
  }
  contentSubtype := strings.ToLower(codec.Name())
  registeredCodecs[contentSubtype] = codec
}

我们只需要定义我们自定义格式的Codec接口,就可以使用grpc传输我们需要的格式google.golang.org/grpc@v1.50.1/encoding/encoding.go

type Codec interface {
  // Marshal returns the wire format of v.
  Marshal(v interface{}) ([]byte, error)
  // Unmarshal parses the wire format into v.
  Unmarshal(data []byte, v interface{}) error
  // Name returns the name of the Codec implementation. The returned string
  // will be used as part of content type in transmission.  The result must be
  // static; the result cannot change between calls.
  Name() string
}

首先我们自定义一个Codec,根据反射判断传入的参数类型,如果是proto.Message格式就用proto格式序列化和反序列化,如果是string类型(已经序列化成json格式了)我们直接不用处理,如果是其他格式,使用json的序列化方法和反序列化方法来进行处理。

package codec

import (
  "bytes"
  "encoding/json"

  "github.com/gogo/protobuf/jsonpb"
  "github.com/golang/protobuf/proto"
  "google.golang.org/grpc/encoding"
)

func init() {
  encoding.RegisterCodec(JSON{
    Marshaler: jsonpb.Marshaler{
      EmitDefaults: true,
      OrigName:     true,
    },
  })
}

type JSON struct {
  jsonpb.Marshaler
  jsonpb.Unmarshaler
}

// Name is name of JSON
func (j JSON) Name() string {
  return "json"
}

func (j JSON) Marshal(v interface{}) (out []byte, err error) {
  if pm, ok := v.(proto.Message); ok {
    b := new(bytes.Buffer)
    err := j.Marshaler.Marshal(b, pm)
    if err != nil {
      return nil, err
    }
    return b.Bytes(), nil
  }
  if val, ok := v.(string); ok {
    return []byte(val), nil
  }

  return json.Marshal(v)
}

func (j JSON) Unmarshal(data []byte, v interface{}) (err error) {
  if pm, ok := v.(proto.Message); ok {
    b := bytes.NewBuffer(data)
    return j.Unmarshaler.Unmarshal(b, pm)
  }
  if vv, ok := v.(*string); ok {
    *vv = string(data)
    return
  }
  return json.Unmarshal(data, v)
}

引用我们自己定义的codec即可实现注册,因为注册方法encoding.RegisterCodec写在init里面了

下面通过一个例子来使用我们自定义的自适应的codec

syntax = "proto3";
package test;
option go_package = "learn/json/grpc-json/rpc";
//定义服务
service TestService {
    //注意:这里是returns 不是return
    rpc SayHello(Request) returns (Response){
    }
    rpc SayHello1(Request) returns (Response){
    }
}
//定义参数类型
message Request {
    string message=1;
}
message Response {
    string message=1;
}

生成代码

protoc --go-grpc_out=. learn/json/grpc-json/rpc/hello.proto

定义服务端

package rpc

import (
  context "context"
  "fmt"
  
  "google.golang.org/grpc/metadata"

   _ "learn/json/grpc-json/codec"
)

type HelloService struct {
}

func (s *HelloService) mustEmbedUnimplementedTestServiceServer() {}

func (s *HelloService) SayHello(ctx context.Context, r *Request) (*Response, error) {
  md, ok := metadata.FromIncomingContext(ctx)
  fmt.Println("SayHello", ctx, r, md, ok, md["head"])
  return &Response{
    Message: "SayHello",
  }, nil
}
func (s *HelloService) SayHello1(ctx context.Context, r *Request) (*Response, error) {
  fmt.Println("SayHello1", ctx, r)
  return &Response{
    Message: "SayHello1",
  }, nil
}

注意需要在我们的服务端注册我们的codec

 _ "learn/json/grpc-json/codec"

启动server服务

// git submodule add https://github.com/johanbrandhorst/grpc-json-example
package main

import (
  "flag"
  "fmt"
  "io/ioutil"
  "net"
  "os"

  "google.golang.org/grpc"
  "google.golang.org/grpc/credentials"
  "google.golang.org/grpc/grpclog"

  "github.com/johanbrandhorst/grpc-json-example/insecure"

  "learn/learn/json/grpc-json/rpc"
)

var (
  gRPCPort = flag.Int("grpc-port", 10000, "The gRPC server port")
)

var log grpclog.LoggerV2

func init() {
  log = grpclog.NewLoggerV2(os.Stdout, ioutil.Discard, ioutil.Discard)
  grpclog.SetLoggerV2(log)
}

func main() {
  flag.Parse()
  addr := fmt.Sprintf("localhost:%d", *gRPCPort)
  lis, err := net.Listen("tcp", addr)
  if err != nil {
    log.Fatalln("Failed to listen:", err)
  }
  s := grpc.NewServer(
    grpc.Creds(credentials.NewServerTLSFromCert(&insecure.Cert)),
  )
  rpc.RegisterTestServiceServer(s, &rpc.HelloService{})
  // Serve gRPC Server
  log.Info("Serving gRPC on https://", addr)
  log.Fatal(s.Serve(lis))
}

这个时候我们就可以测试我们的json格式传输是不是work

echo -en '\x00\x00\x00\x00\x16{"message":"xiazemin"}' | curl -ss -k --http2 \
        -H "Content-Type: application/grpc+json" \
        -H "TE:trailers" \
        --data-binary @- \
        https://localhost:10000/test.TestService/SayHello | od -bc

返回值是

0000000   000 000 000 000 026 173 042 155 145 163 163 141 147 145 042 072
          \0  \0  \0  \0 026   {   "   m   e   s   s   a   g   e   "   :
0000020   042 123 141 171 110 145 154 154 157 042 175                    
           "   S   a   y   H   e   l   l   o   "   }                    
0000033

可以看到已经成功了,解释下

\x00\x00\x00\x00\x16

的含义,这是http2 的message payload header

  • 第一个自己表示是否压缩 :Compression boolean (1 byte)
  • 后面四个字节表示我们请求数据的大小:Payload size (4 bytes)
  • 我们这\x16 表示我们传输的json的格式大小是22字节,可以自己数一下。

当然我也可以通过go客户端来发送json格式请求,我们先定义一个flag类型来接受curl 的http 头部格式

type arrayFlags []string

func (i *arrayFlags) String() string {
  return fmt.Sprint(*i)
}
func (i *arrayFlags) Set(value string) error {
  *i = append(*i, value)
  return nil
}

把得到的参数注入到metaData里面,然后在启动连接的时候指定我们的编解码格式。

package main

import (
  "context"
  "flag"
  "fmt"
  "net"
  "strings"

  "google.golang.org/grpc"
  "google.golang.org/grpc/credentials"
  "google.golang.org/grpc/metadata"

  "learn/learn/json/grpc-json-example/insecure"
  "learn/learn/json/grpc-json/rpc"
)

type arrayFlags []string

func (i *arrayFlags) String() string {
  return fmt.Sprint(*i)
}
func (i *arrayFlags) Set(value string) error {
  *i = append(*i, value)
  return nil
}

var (
  headers arrayFlags
  addr    string
  port    string
  method  string
  data    string
)

func init() {
  flag.Var(&headers, "H", "-H 'mirror:mirror' -H 'content-type:application/json'")
  flag.StringVar(&addr, "addr", "localhost", "The address of the server to connect to")
  flag.StringVar(&port, "port", "10000", "The port to connect to")
  flag.StringVar(&method, "m", "test.TestService/SayHello", "the method wang to call")
  flag.StringVar(&data, "d", "{}", "the data wang to send")
  flag.Parse()
}

func main() {
  ctx := context.Background()
  if headers != nil {
    md := metadata.MD{}
    for _, header := range headers {
      pairs := strings.Split(header, ":")
      if len(pairs) != 2 {
        panic(fmt.Sprintf("invalid header %s", header))
      } else {
        md[strings.Trim(pairs[0], " ")] = append(md[strings.Trim(pairs[0], " ")], strings.Trim(pairs[1], " "))
      }
    }
    ctx = metadata.NewOutgoingContext(ctx, md)
  }

  conn, err := grpc.DialContext(ctx, net.JoinHostPort(addr, port),
    grpc.WithTransportCredentials(credentials.NewClientTLSFromCert(insecure.CertPool, "")),
    grpc.WithDefaultCallOptions(grpc.CallContentSubtype(rpc.JSON{}.Name())),
  )
  if err != nil {
    panic(err)
  }
  defer conn.Close()

  c := rpc.NewTestServiceClient(conn)
  resp, err := c.SayHello(ctx, &rpc.Request{Message: "xiazemin"})
  if err != nil {
    panic(err)
  }
  fmt.Println(resp)
 
  reply1 := new(string)
  err = grpc.Invoke(ctx, method, data, reply1, conn)
  if err != nil {
    panic(err)
    }
  fmt.Println("response:")
  fmt.Println(*reply1)
}

这里我们发起了两种请求,一种是普通的grpc请求,另一种就是我们自定定义的json格式,测试下

go run learn/json/grpc-json/client/main.go -H 'head:h1' -H 'head:h2' -d '{"message":"xiazemin"}' -m test.TestService/SayHello -addr 127.0.0.1 -port 10000
message:"SayHello"
response:
{"message":"SayHello"}

可以看到两种方式都是work的,说明了我们的codec具有自适应能力的。

当然,我们也可以定义普通的go类型发起请求,也是能处理的,比如:

  err = grpc.Invoke(ctx, method, map[string]interface{}{"message": "xiaz"}, &reply, conn)
  if err != nil {
    panic(err)
  }
  fmt.Println("response:")
  fmt.Println(string(reply.Msg))

总的来说,grpc框架整体的灵活性还是挺大的,它给我们提供了默认选项,非常好用,生产中我们也可以根据自己的需求灵活自定义。