zl程序教程

您现在的位置是:首页 >  其他

当前栏目

mac 上学习k8s系列(52)goreplay流量录制

2023-02-18 16:32:25 时间

goreplay(https://github.com/buger/goreplay)是基于libpcap的流量录制工具,它依赖包https://github.com/google/gopacket,而gopacket是对libpcap和npcap的go封装。可见其流量复制原理和tcpdump一样。我们通过一个简单的go服务器学习下如何使用。

git clone https://github.com/buger/goreplay
cd goreplay
go build -o goreplay .
package main

import (
  "flag"
  "fmt"
  "net/http"
)

func main() {
  var port int
  flag.IntVar(&port, "port", 8080, "Input your port")
  flag.Parse()
  http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
    fmt.Println(r.Host)
    fmt.Fprintf(w, "request:%s", r.RequestURI)
  })
  http.ListenAndServe(fmt.Sprintf(":%d", port), nil)
}

启动俩服务器

go run learn/goreplay/http/server.go -port 8081
go run learn/goreplay/http/server.go -port 8082
sudo ./goreplay/goreplay --input-raw :8082 --output-http "http://127.0.0.1:8081"

我们请求8082服务器,发现流量被复制到了8081

% ab -n 100 -c 10 http://127.0.0.1:8082/

goreplay 同时也支持将流量录制到文件,或者es

sudo ./goreplay/goreplay --input-raw :8082 --output-file %Y%m%d.log --output-file-append

首先我们启动es

docker run -d --name elasticsearch  -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node"  --add-host=host.docker.internal:host-gateway elasticsearch:7.17.6 

测试下es的链接和写入

package main

import (
  "context"
  "log"
  "os"
  "time"

  "github.com/buger/goreplay/proto"
  "github.com/olivere/elastic"
)

type ESRequestResponse struct {
  ReqHost           string `json:"Req_Host"`
  ReqMethod         string `json:"Req_Method"`
  ReqURL            string `json:"Req_URL"`
  ReqBody           string `json:"Req_Body"`
  ReqUserAgent      string `json:"Req_User-Agent"`
  ReqXRealIP        string `json:"Req_X-Real-IP"`
  ReqXForwardedFor  string `json:"Req_X-Forwarded-For"`
  ReqConnection     string `json:"Req_Connection,omitempty"`
  ReqCookies        string `json:"Req_Cookies,omitempty"`
  RespStatusCode    string `json:"Resp_Status-Code"`
  RespBody          string `json:"Resp_Body"`
  RespProto         string `json:"Resp_Proto,omitempty"`
  RespContentLength string `json:"Resp_Content-Length,omitempty"`
  RespContentType   string `json:"Resp_Content-Type,omitempty"`
  RespSetCookie     string `json:"Resp_Set-Cookie,omitempty"`
  Rtt               int64  `json:"RTT"`
  Timestamp         time.Time
}

func main() {
  client, err := elastic.NewSimpleClient(
    elastic.SetURL("http://127.0.0.1:9200"),
    // 设置错误日志
    elastic.SetErrorLog(log.New(os.Stderr, "ES-ERROR ", log.LstdFlags)),
    // 设置info日志
    elastic.SetInfoLog(log.New(os.Stdout, "ES-INFO ", log.LstdFlags)),
  )
  if err != nil {
    log.Println(err)
  }
  exists, err := client.IndexExists("index1").Do(context.Background())
  if err != nil {
    log.Println(err)
  }

  if !exists {
    _, err := client.CreateIndex("index1").Do(context.Background())
    if err != nil {
      log.Println(err)
    }
  }
  log.Println("Initialized Elasticsearch Plugin")
  req := make([]byte, 10240)
  resp := make([]byte, 10240)
  t := time.Now()

  host := ESRequestResponse{
    ReqHost:           string(proto.Header(req, []byte("Host"))),
    ReqMethod:         string(proto.Method(req)),
    ReqURL:            string(proto.Path(req)),
    ReqBody:           string(proto.Body(req)),
    ReqUserAgent:      string(proto.Header(req, []byte("User-Agent"))),
    ReqXRealIP:        string(proto.Header(req, []byte("X-Real-IP"))),
    ReqXForwardedFor:  string(proto.Header(req, []byte("X-Forwarded-For"))),
    ReqConnection:     string(proto.Header(req, []byte("Connection"))),
    ReqCookies:        string(proto.Header(req, []byte("Cookie"))),
    RespStatusCode:    string(proto.Status(resp)),
    RespProto:         string(proto.Method(resp)),
    RespBody:          string(proto.Body(resp)),
    RespContentLength: string(proto.Header(resp, []byte("Content-Length"))),
    RespContentType:   string(proto.Header(resp, []byte("Content-Type"))),
    RespSetCookie:     string(proto.Header(resp, []byte("Set-Cookie"))),
    Timestamp:         t,
    Rtt:               0,
  }

  h, err := client.Index().Index("index1").Type("ESRequestResponse").BodyJson(host).Do(context.Background())
  if err != nil {
    log.Println(err)
  }
  log.Printf("Indexed data with ID %s to index %s, type %s\n", h.Id, h.Index, h.Type)
  return
}
go run learn/goreplay/es/main.go
 curl  -H "Content-Type:application/json" -XGET http://elastic:elastic@127.0.0.1:9200/_cat/indices
green  open .geoip_databases 3pp6kUf0Td6wJ0AvFrm-tQ 1 0 41 0 39.2mb 39.2mb
yellow open index1           mVl0z97WQvqvATHmT_vXnQ 1 1  0 0   226b   226b

发现我们能够创建成功。goreplay录制流量入es非常简单

sudo ./goreplay/goreplay --input-raw-track-response --output-http-track-response --input-raw :8082 --output-http "http://127.0.0.1:8081/" --output-http-elasticsearch 'http://127.0.0.1:9200/gor'

实际测试的时候发现不能成功,原因是使用的写入es的插件,客户端版本太低了,针对es7,笔者实现了一个版本,可以用来替换下goreplay/elasticsearch.go

package main

import (
  "context"
  "log"
  "net/url"
  "os"
  "strings"
  "sync"
  "time"

  "github.com/buger/goreplay/proto"
  elastigo "github.com/mattbaird/elastigo/lib"

  "github.com/olivere/elastic"
)

type ESUriErorr struct{}

func (e *ESUriErorr) Error() string {
  return "Wrong ElasticSearch URL format. Expected to be: scheme://host/index_name"
}

type ESPlugin struct {
  Url     string
  Active  bool
  ApiPort string
  eConn   *elastigo.Conn
  Host    string
  Index   string
  indexor *elastigo.BulkIndexer
  done    chan bool
  client  *elastic.Client
}

func (p *ESPlugin) RttDurationToMs(d time.Duration) int64 {
  sec := d / time.Second
  nsec := d % time.Second
  fl := float64(sec) + float64(nsec)*1e-6
  return int64(fl)
}

type ESRequestResponse struct {
  ReqHost           string `json:"Req_Host"`
  ReqMethod         string `json:"Req_Method"`
  ReqURL            string `json:"Req_URL"`
  ReqBody           string `json:"Req_Body"`
  ReqUserAgent      string `json:"Req_User-Agent"`
  ReqXRealIP        string `json:"Req_X-Real-IP"`
  ReqXForwardedFor  string `json:"Req_X-Forwarded-For"`
  ReqConnection     string `json:"Req_Connection,omitempty"`
  ReqCookies        string `json:"Req_Cookies,omitempty"`
  RespStatusCode    string `json:"Resp_Status-Code"`
  RespBody          string `json:"Resp_Body"`
  RespProto         string `json:"Resp_Proto,omitempty"`
  RespContentLength string `json:"Resp_Content-Length,omitempty"`
  RespContentType   string `json:"Resp_Content-Type,omitempty"`
  RespSetCookie     string `json:"Resp_Set-Cookie,omitempty"`
  Rtt               int64  `json:"RTT"`
  Timestamp         time.Time
}

// Parse ElasticSearch URI
//
// Proper format is: scheme://[userinfo@]host/index_name
// userinfo is: user[:password]
// net/url.Parse() does not fail if scheme is not provided but actually does not
// handle URI properly.
// So we must 'validate' URI format to match requirements to use net/url.Parse()
func parseURI(URI string) (err error, host, index string) {

  parsedUrl, parseErr := url.Parse(URI)

  if parseErr != nil {
    err = new(ESUriErorr)
    return
  }

  //  check URL validity by extracting host and index values.
  host = parsedUrl.Host
  urlPathParts := strings.Split(parsedUrl.Path, "/")
  index = urlPathParts[len(urlPathParts)-1]

  // force index specification in uri : ie no implicit index
  if host == "" || index == "" {
    err = new(ESUriErorr)
  }

  return
}

var initOnce sync.Once

func (p *ESPlugin) Init(URI string) {
  p.Url = URI
  var err error

  err, p.Host, p.Index = parseURI(URI)
  log.Println("Initializing Elasticsearch Plugin", p.Index, p.Host)
  t := time.Now()
  if p.Index == "" {
    p.Index = "gor-" + t.Format("2006-01-02")
  }

  if err != nil {
    log.Fatal("Can't initialize ElasticSearch plugin.", err)
  }

  initOnce.Do(func() {
    p.client, err = elastic.NewSimpleClient(
      elastic.SetURL("http://"+p.Host),
      // 设置错误日志
      elastic.SetErrorLog(log.New(os.Stderr, "ES-ERROR ", log.LstdFlags)),
      elastic.SetBasicAuth("elastic", "elastic"), // 账号密码
      // 设置info日志
      elastic.SetInfoLog(log.New(os.Stdout, "ES-INFO ", log.LstdFlags)),
    )
    if err != nil {
      log.Println(err)
    }
  })

  exists, err := p.client.IndexExists(p.Index).Do(context.Background())
  if err != nil {
    log.Println(err)
  }

  if !exists {
    _, err := p.client.CreateIndex(p.Index).Do(context.Background())
    if err != nil {
      log.Println(err)
    }
  }
  log.Println("Initialized Elasticsearch Plugin")
  return
}

func (p *ESPlugin) ResponseAnalyze(req, resp []byte, start, stop time.Time) {
  if len(resp) == 0 && len(req) == 0 {
    // nil http response - skipped elasticsearch export for this request
    log.Println("ResponseAnalyze ", resp, req)
    return
  }

  t := time.Now()
  rtt := p.RttDurationToMs(stop.Sub(start))
  req = payloadBody(req)

  host := ESRequestResponse{
    ReqHost:           string(proto.Header(req, []byte("Host"))),
    ReqMethod:         string(proto.Method(req)),
    ReqURL:            string(proto.Path(req)),
    ReqBody:           string(proto.Body(req)),
    ReqUserAgent:      string(proto.Header(req, []byte("User-Agent"))),
    ReqXRealIP:        string(proto.Header(req, []byte("X-Real-IP"))),
    ReqXForwardedFor:  string(proto.Header(req, []byte("X-Forwarded-For"))),
    ReqConnection:     string(proto.Header(req, []byte("Connection"))),
    ReqCookies:        string(proto.Header(req, []byte("Cookie"))),
    RespStatusCode:    string(proto.Status(resp)),
    RespProto:         string(proto.Method(resp)),
    RespBody:          string(proto.Body(resp)),
    RespContentLength: string(proto.Header(resp, []byte("Content-Length"))),
    RespContentType:   string(proto.Header(resp, []byte("Content-Type"))),
    RespSetCookie:     string(proto.Header(resp, []byte("Set-Cookie"))),
    Timestamp:         t,
    Rtt:               rtt,
  }

  h, err := p.client.Index().Index(p.Index).BodyJson(host).Type("_doc").Do(context.Background()) //Type("ESRequestResponse").
  if err != nil {
    log.Println(err)
    return
  }
  log.Printf("Indexed data with ID %s to index %s, type %s\n", h.Id, h.Index, h.Type)
  return
}

替换后重新编译下,开始测试:

 % ab -n 100 -c 10 http://127.0.0.1:8082/data/$RANDOM

查询下流量

 % curl -H "Content-Type:application/json" -XGET http://elastic:elastic@127.0.0.1:9200/gor/_doc/fzsUr4QB4C7FYlos41fc
{"_index":"gor","_type":"_doc","_id":"fzsUr4QB4C7FYlos41fc","_version":1,"_seq_no":2198,"_primary_term":1,"found":true,"_source":{"Req_Host":"127.0.0.1:8082","Req_Method":"Host:","Req_URL":"","Req_Body":"","Req_User-Agent":"ApacheBench/2.3","Req_X-Real-IP":"","Req_X-Forwarded-For":"","Resp_Status-Code":"200","Resp_Body":"request:/","Resp_Proto":"HTTP/1.1","Resp_Content-Length":"9","Resp_Content-Type":"text/plain; charset=utf-8","RTT":18,"Timestamp":"2022-11-25T21:58:10.711494+08:00"}}
% curl -H "Content-Type:application/json" -XGET http://elastic:elastic@127.0.0.1:9200/gor/_search -d  '{"query":{"match":{"Req_Body":"1213"}}}' 
{"took":13,"timed_out":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":4,"relation":"eq"},"max_score":0.4084168,"hits":[{"_index":"gor","_type":"ESRequestResponse","_id":"gjsXr4QB4C7FYlosPVfB","_score":0.4084168,"_source":{"Req_Host":"127.0.0.1:8082","Req_Method":"Host:","Req_URL":"","Req_Body":"{data:17225,qe:1213}","Req_User-Agent":"curl/7.79.1","Req_X-Real-IP":"","Req_X-Forwarded-For":"","Resp_Status-Code":"200","Resp_Body":"request:/","Resp_Proto":"HTTP/1.1","Resp_Content-Length":"9","Resp_Content-Type":"text/plain; charset=utf-8","RTT":3,"Timestamp":"2022-11-25T22:00:46.769234+08:00"}},{"_index":"gor","_type":"ESRequestResponse","_id":"gzsXr4QB4C7FYlosXVcM","_score":0.4084168,"_source":{"Req_Host":"127.0.0.1:8082","Req_Method":"Host:","Req_URL":"","Req_Body":"{data:10453,qe:1213}","Req_User-Agent":"curl/7.79.1","Req_X-Real-IP":"","Req_X-Forwarded-For":"","Resp_Status-Code":"200","Resp_Body":"request:/","Resp_Proto":"HTTP/1.1","Resp_Content-Length":"9","Resp_Content-Type":"text/plain; charset=utf-8","RTT":1,"Timestamp":"2022-11-25T22:00:54.769936+08:00"}},{"_index":"gor","_type":"ESRequestResponse","_id":"hDsXr4QB4C7FYlosZFfd","_score":0.4084168,"_source":{"Req_Host":"127.0.0.1:8082","Req_Method":"Host:","Req_URL":"","Req_Body":"{data:16202,qe:1213}","Req_User-Agent":"curl/7.79.1","Req_X-Real-IP":"","Req_X-Forwarded-For":"","Resp_Status-Code":"200","Resp_Body":"request:/","Resp_Proto":"HTTP/1.1","Resp_Content-Length":"9","Resp_Content-Type":"text/plain; charset=utf-8","RTT":0,"Timestamp":"2022-11-25T22:00:56.769345+08:00"}},{"_index":"gor","_type":"ESRequestResponse","_id":"hTsZr4QB4C7FYlosQVe0","_score":0.4084168,"_source":{"Req_Host":"127.0.0.1:8082","Req_Method":"Host:","Req_URL":"","Req_Body":"{data:23396,qe:1213}","Req_User-Agent":"curl/7.79.1","Req_X-Real-IP":"","Req_X-Forwarded-For":"","Resp_Status-Code":"200","Resp_Body":"request:/","Resp_Proto":"HTTP/1.1","Resp_Content-Length":"9","Resp_Content-Type":"text/plain; charset=utf-8","RTT":2,"Timestamp":"2022-11-25T22:02:58.83474+08:00"}}]}}

我们可以通过es快速查询我们需要的流量,用来回放或者测试。