zl程序教程

您现在的位置是:首页 >  其他

当前栏目

golang源码分析:分布式链路追踪

2023-02-18 16:32:25 时间

在上一节搭完分布式追踪的采集展示链路后,这一节开始分析分析分布式链路追踪的核心源码。我们知道分布式追踪的原理是通过traceId串联调用链路上的所有服务和日志,每个服务都有一个自己的spanId,每一次rpc调用都需要生成一个子spanId,通过父子spanID的对应关系,构建一个有向无环图实现分布式追踪的。因此在业务代码的接入过程中需要实现如下功能,父子span关系的构建,父子span关系的传递(包括context内部传递和rpc服务之间的传递有可能跨协议比如http和grpc协议之间传递),rpc日志的采样,上报等等。每一个厂商都有自己的实现,opentrace定义了统一的标准接口,我们按照标准实现即可。在业务代码中实现包括四步:

1,定义tracer,包括采样配置和agent上报相关的配置,然后放入全局变量中。

2,服务端响应请求的时候解析传入的trace,放入context

3,发起下游调用的时候序列化trace,传递给下游

4,对于业务日志需要串联trace的地方,我们打印带context的日志,从context中提取trace和当前span的信息。

下面我们结合golang源码看下实现

func main() {
  tracer, closer, err := middleware.NewTracer("rootTracerExample", "127.0.0.1:6831", false)
  defer closer.Close()
  if err != nil {
    panic(err)
  }
  opentracing.SetGlobalTracer(tracer)
  go grpc.Main()
  http.Main()
}

定义tracer

import (
  "io"
  "time"

  "github.com/opentracing/opentracing-go"
  "github.com/uber/jaeger-client-go"
  jaegercfg "github.com/uber/jaeger-client-go/config"
)

var Tracer opentracing.Tracer

// NewTracer 创建一个jaeger Tracer
func NewTracer(servicename string, addr string, udp bool) (opentracing.Tracer, io.Closer, error) {
  cfg := jaegercfg.Configuration{
    ServiceName: servicename,
    Sampler: &jaegercfg.SamplerConfig{
      Type:  jaeger.SamplerTypeConst, //固定采样
      Param: 1,                       //1全采样,0不采样
    },
    Reporter: &jaegercfg.ReporterConfig{
      LogSpans:            true,
      BufferFlushInterval: 1 * time.Second,
      LocalAgentHostPort:  addr, //"127.0.0.1:6831",
    },
  }

  sender, err := jaeger.NewUDPTransport(addr, 0)
  if err != nil {
    return nil, nil, err
  }
  if udp {
    reporter := jaeger.NewRemoteReporter(sender)
    // Initialize tracer with a logger and a metrics factory
    return cfg.NewTracer(
      jaegercfg.Reporter(reporter),
    )
  }
  return cfg.NewTracer()
}

为了演示完整效果,我们定义一个http服务和一个grpc服务,完成http调http+http调grpc+grpc调grpc

syntax = "proto3";
package test;
option go_package = "learn/Jaeger/exp1/grpc";
//定义服务
service TestService {
    //注意:这里是returns 不是return
    rpc SayHello(Request) returns (Response){
    }
    rpc SayHello1(Request) returns (Response){
    }
}
//定义参数类型
message Request {
    string message=1;
}
message Response {
    string message=1;
}

生成下代码

% protoc --go-grpc_out=. learn/Jaeger/exp1/grpc/hello.proto
% protoc --go_out=.  learn/Jaeger/exp1/grpc/hello.proto

定义grpc的服务端代码

package grpc

import (
  context "context"
  "fmt"
  "learn/learn/Jaeger/middleware"
  "log"
  "net"

  grpc "google.golang.org/grpc"
)

func Main() {
  srv := grpc.NewServer(grpc.UnaryInterceptor(middleware.TraceSpanServerInterceptor()))
  RegisterTestServiceServer(srv, &HelloService{})

  listener, err := net.Listen("tcp", ":8081")
  if err != nil {
    log.Fatalf("failed to listen: %v", err)
  }

  err = srv.Serve(listener)
  if err != nil {
    log.Fatalf("failed to serve: %v", err)
  }

}

type HelloService struct {
}

func (s *HelloService) mustEmbedUnimplementedTestServiceServer() {}

func (s *HelloService) SayHello(ctx context.Context, r *Request) (*Response, error) {
  conn, err := grpc.Dial("127.0.0.1:8081", grpc.WithInsecure(), grpc.WithBlock(), grpc.WithUnaryInterceptor(middleware.TraceSpanClientInterceptor()))
  if err != nil {
    log.Fatalf("did not connect: %v", err)
  }
  defer conn.Close()
  client := NewTestServiceClient(conn)
  client.SayHello1(ctx, r)
  fmt.Println("SayHello", ctx)
  return &Response{}, nil
}
func (s *HelloService) SayHello1(ctx context.Context, r *Request) (*Response, error) {
  fmt.Println("SayHello1", ctx)
  return &Response{}, nil
}

实现一个简单的http服务

package http

import (
  "fmt"
  "log"
  "net/http"

  "learn/learn/Jaeger/middleware"

  mygrpc "learn/learn/Jaeger/exp1/grpc"

  "google.golang.org/grpc"
)

func Main() {
  mutx := http.NewServeMux()
  mutx.HandleFunc("/request1", request1)
  mutx.HandleFunc("/request2", request2)
  http.ListenAndServe(":8080", middleware.ServerTraceSpan(mutx))

}

func request1(w http.ResponseWriter, r *http.Request) {

  url := "http://localhost:8080/request2"

  bytes, err := middleware.ClientTraceSpan(r.Context(), "GET", url, nil)
  if err != nil {
    fmt.Fprint(w, err.Error())
  }
  fmt.Fprint(w, string(bytes))
  fmt.Println("request1", r.Context())
}
func request2(w http.ResponseWriter, r *http.Request) {
  conn, err := grpc.Dial("127.0.0.1:8081", grpc.WithInsecure(), grpc.WithBlock(), grpc.WithUnaryInterceptor(middleware.TraceSpanClientInterceptor()))
  if err != nil {
    log.Fatalf("did not connect: %v", err)
  }
  defer conn.Close()
  client := mygrpc.NewTestServiceClient(conn)
  client.SayHello(r.Context(), &mygrpc.Request{})
  fmt.Println("request2", r.Context())
}

我们通过middleware的方式实现trace的传递,对于gprc服务

package middleware

import (
  "context"
  "encoding/base64"
  "fmt"
  "strings"

  "github.com/opentracing/opentracing-go"
  "github.com/opentracing/opentracing-go/ext"
  "github.com/siddontang/go/log"
  "github.com/uber/jaeger-client-go"
  "google.golang.org/grpc"
  "google.golang.org/grpc/metadata"
  //"example/constants"
)

// TraceSpanClientInterceptor returns a grpc.UnaryServerInterceptor suitable
// for use in a grpc.Dial() call.
//
// For example:
//
//     conn, err := grpc.Dial(
//         address,
//         ...,  // (existing DialOptions)
//         grpc.WithUnaryInterceptor(rpc.TraceSpanClientInterceptor()),
//     )
//
// It writes current trace span to request metadata.
func TraceSpanClientInterceptor() grpc.UnaryClientInterceptor {
  return func(
    ctx context.Context,
    method string, req, resp interface{},
    cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption,
  ) (err error) {
    span, ctx := opentracing.StartSpanFromContext(ctx, "RPC Client "+method)
    defer span.Finish()

    // Save current span context.
    md, ok := metadata.FromOutgoingContext(ctx)
    if !ok {
      md = metadata.Pairs()
    }
    if err = opentracing.GlobalTracer().Inject(
      span.Context(), opentracing.HTTPHeaders, metadataTextMap(md),
    ); err != nil {
      log.Errorf("Failed to inject trace span: %v", err)
    }
    return invoker(metadata.NewOutgoingContext(ctx, md), method, req, resp, cc, opts...)
  }
}

// TraceSpanServerInterceptor returns a grpc.UnaryServerInterceptor suitable
// for use in a grpc.NewServer call.
//
// For example:
//
//     s := grpc.NewServer(
//         ...,  // (existing ServerOptions)
//         grpc.UnaryInterceptor(rpc.TraceSpanServerInterceptor()),
//     )
//
// It reads current trace span from request metadata.
func TraceSpanServerInterceptor() grpc.UnaryServerInterceptor {
  return func(
    ctx context.Context,
    req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler,
  ) (resp interface{}, err error) {
    // Extract parent trace span.
    md, ok := metadata.FromIncomingContext(ctx)
    if !ok {
      md = metadata.Pairs()
    }
    parentSpanContext, err := opentracing.GlobalTracer().Extract(
      opentracing.HTTPHeaders, metadataTextMap(md),
    )
    switch err {
    case nil:
    case opentracing.ErrSpanContextNotFound:
      log.Info(ctx, "Parent span not found, will start new one.")
    default:
      log.Errorf("Failed to extract trace span: %v", err)
    }

    // Start new trace span.
    span := opentracing.StartSpan(
      "RPC Server "+info.FullMethod,
      ext.RPCServerOption(parentSpanContext),
    )
    defer span.Finish()
    ctx = opentracing.ContextWithSpan(ctx, span)

    // Set request ID for context.
    if sc, ok := span.Context().(jaeger.SpanContext); ok {
      ctx = context.WithValue(ctx, "constants.RequestID", sc.TraceID().String())
    }

    return handler(ctx, req)
  }
}

const (
  binHeaderSuffix = "_bin"
)

// metadataTextMap extends a metadata.MD to be an opentracing textmap
type metadataTextMap metadata.MD

// Set is a opentracing.TextMapReader interface that extracts values.
func (m metadataTextMap) Set(key, val string) {
  // gRPC allows for complex binary values to be written.
  encodedKey, encodedVal := encodeKeyValue(key, val)
  // The metadata object is a multimap, and previous values may exist, but for opentracing headers, we do not append
  // we just override.
  m[encodedKey] = []string{encodedVal}
}

// ForeachKey is a opentracing.TextMapReader interface that extracts values.
func (m metadataTextMap) ForeachKey(callback func(key, val string) error) error {
  for k, vv := range m {
    for _, v := range vv {
      if decodedKey, decodedVal, err := metadata.DecodeKeyValue(k, v); err == nil {
        if err = callback(decodedKey, decodedVal); err != nil {
          return err
        }
      } else {
        return fmt.Errorf("failed decoding opentracing from gRPC metadata: %v", err)
      }
    }
  }
  return nil
}

// encodeKeyValue encodes key and value qualified for transmission via gRPC.
// note: copy pasted from private values of grpc.metadata
func encodeKeyValue(k, v string) (string, string) {
  k = strings.ToLower(k)
  if strings.HasSuffix(k, binHeaderSuffix) {
    val := base64.StdEncoding.EncodeToString([]byte(v))
    v = string(val)
  }
  return k, v
}

由于官方默认包里只实现了bin,kv和httpHeader三种格式的carrier,因此对于grpc服务需要自己实现carrier。对于http服务实现如下

package middleware

import (
  "context"
  "fmt"
  "io"
  "io/ioutil"
  "net/http"

  "github.com/opentracing/opentracing-go"
  "github.com/opentracing/opentracing-go/ext"
  "github.com/uber/jaeger-client-go"
)

const TraceHeader = "Http-TraceHeader"

func ClientTraceSpan(ctx context.Context, method, url string, body io.Reader) (resBody []byte, err error) {
  client := &http.Client{}
  req, err := http.NewRequest(method, url, body)
  if err != nil {
    panic(err)
  }

  span, _ := opentracing.StartSpanFromContext(ctx, TraceHeader)
  defer span.Finish()
  ext.SpanKindRPCClient.Set(span)
  ext.HTTPUrl.Set(span, url)
  ext.HTTPMethod.Set(span, "GET")
  span.Tracer().Inject(
    span.Context(),
    opentracing.HTTPHeaders,
    opentracing.HTTPHeadersCarrier(req.Header),
  )
  ctx = opentracing.ContextWithSpan(ctx, span)
  req.WithContext(ctx)
  resp, err := client.Do(req)
  if err != nil {
    fmt.Println("请求错误:", err)
  }
  return ioutil.ReadAll(resp.Body)
}

//中间件
func ServerTraceSpan(next http.Handler) http.Handler {
  return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
    tracer := opentracing.GlobalTracer()
    // 从ctx获取span
    if parent := opentracing.SpanFromContext(r.Context()); parent != nil {
      parentCtx := parent.Context()
      // 获取opentracing中的全局tracer
      if tracer := opentracing.GlobalTracer(); tracer != nil {
        mySpan := tracer.StartSpan("my info", opentracing.ChildOf(parentCtx))
        // 由于前面opentracing中的tracer是jaeger的,所以你这里转化为jaeger.SpanContext
        if sc, ok := mySpan.Context().(jaeger.SpanContext); ok {
          // 这里,就能获取traceid等信息了,可以放在日志里
          w.Header().Set(TraceHeader, sc.TraceID().String())
        }
        defer mySpan.Finish()
      }
    }

    spanCtx, _ := tracer.Extract(opentracing.HTTPHeaders, opentracing.HTTPHeadersCarrier(r.Header))
    span := opentracing.StartSpan(
      "RPC Server "+r.RequestURI,
      ext.RPCServerOption(spanCtx),
    )
    defer span.Finish()
    ctx := opentracing.ContextWithSpan(r.Context(), span)

    if sc, ok := span.Context().(jaeger.SpanContext); ok {
      ctx = context.WithValue(ctx, "constants.RequestID", sc.TraceID().String())
      w.Header().Set(TraceHeader, sc.TraceID().String())
    }
    next.ServeHTTP(w, r.WithContext(ctx))
  })
}

为了方便测试可以把traceID单独提出来放入httpheader里面,测试下

% curl -vi http://127.0.0.1:8080/request1
*   Trying 127.0.0.1:8080...
* Connected to 127.0.0.1 (127.0.0.1) port 8080 (#0)
> GET /request1 HTTP/1.1
> Host: 127.0.0.1:8080
> User-Agent: curl/7.79.1
> Accept: */*
>
* Mark bundle as not supporting multiuse
< HTTP/1.1 200 OK
HTTP/1.1 200 OK
< Http-Traceheader: 73f6efd73f361c12
Http-Traceheader: 73f6efd73f361c12
< Date: Sun, 23 Oct 2022 19:46:28 GMT
Date: Sun, 23 Oct 2022 19:46:28 GMT
< Content-Length: 0
Content-Length: 0

<
* Connection #0 to host 127.0.0.1 left intact

效果如下

当然上述实现还是很粗糙的,比如为了方便使用默认的contextKey 传递trace信息,没有实现自定义的Extract和Inject方法,导致client和server各打印了一份trace信息。下一期在源码实现分析的时候介绍如何优化。