zl程序教程

您现在的位置是:首页 >  其他

当前栏目

golang 源码分析:nacos服务发现

2023-02-18 16:32:25 时间

https://github.com/alibaba/nacos 是阿里开源的服务发现和配置同步组件,上手非常容易,我们介绍下如何部署,然后看下nacos提供的golang sdk:https://github.com/nacos-group/nacos-sdk-go如何使用,分析下具体的源码实现。

docker run --name nacos-quick -e MODE=standalone -p 8848:8848 -p 9848:9848 -d nacos/nacos-server:2.0.2

Unable to find image 'nacos/nacos-server:2.0.2' locally
2.0.2: Pulling from nacos/nacos-server
9a03b1668b6d: Pull complete 
Digest: sha256:ac66d2fbc1ba432beff88beb165e5012686863d72a5e0f25da06e23c5e7b329d
Status: Downloaded newer image for nacos/nacos-server:2.0.2
db9558d41223b12bd58f2c120ead7d506a50bd40327a3fc6518178b27e50dd99

在nacos 1.X的版本中使用http方式来做服务注册和发现,配置主端口(默认8848);在2.0版本支持了grpc 服务发现:9848 是客户端gRPC请求服务端端口,用于客户端向服务端发起连接和请求9849是服务端gRPC请求服务端端口,用于服务间同步等。

我们实现一个服务注册

curl -X POST 'http://127.0.0.1:8848/nacos/v1/ns/instance?serviceName=nacos.naming.serviceName&ip=20.18.7.10&port=8080'
ok

拉取注册结果

curl -X GET 'http://127.0.0.1:8848/nacos/v1/ns/instance/list?serviceName=nacos.naming.serviceName'
{"name":"DEFAULT_GROUP@@nacos.naming.serviceName","groupName":"DEFAULT_GROUP","clusters":"","cacheMillis":10000,"hosts":[],"lastRefTime":1667400684240,"checksum":"","allIPs":false,"reachProtectionThreshold":false,"valid":true}
{"name":"DEFAULT_GROUP@@nacos.naming.serviceName","groupName":"DEFAULT_GROUP","clusters":"","cacheMillis":10000,"hosts":[{"instanceId":"20.18.7.10#8080#DEFAULT#DEFAULT_GROUP@@nacos.naming.serviceName","ip":"20.18.7.10","port":8080,"weight":1.0,"healthy":true,"enabled":true,"ephemeral":true,"clusterName":"DEFAULT","serviceName":"DEFAULT_GROUP@@nacos.naming.serviceName","metadata":{},"instanceHeartBeatInterval":5000,"instanceIdGenerator":"simple","ipDeleteTimeout":30000,"instanceHeartBeatTimeOut":15000}],"lastRefTime":1667400719947,"checksum":"","allIPs":false,"reachProtectionThreshold":false,"valid":true}

同样我们也可以使用nacos的配置中心功能,发布配置

curl -X POST "http://127.0.0.1:8848/nacos/v1/cs/configs?dataId=nacos.cfg.dataId&group=test&content=helloWorld"
true

获取配置

curl -X GET "http://127.0.0.1:8848/nacos/v1/cs/configs?dataId=nacos.cfg.dataId&group=test"
helloWorld

其实golang的sdk就是基于上述api做的封装来实现服务注册与发现的。

我们定义一个服务

syntax = "proto3";
import "google/protobuf/empty.proto";
package grpcnacos;

option go_package = ".;grpcnacos";
service Test{
  rpc Test(google.protobuf.Empty) returns( TestResponse) {};
}

message TestResponse{
  string msg = 1;
}

生成对应的golang代码

mkdir -p ../pkg/protocol/grpcnacos
protoc --go_out=../pkg/protocol/grpcnacos --go_opt=paths=source_relative     --go-grpc_out=../pkg/protocol/grpcnacos  --go-grpc_opt=paths=source_relative grpcnacos.proto

定义grpc服务的实现逻辑

package service

import (
  "context"
  "learn/learn/Nacos/pkg/protocol/grpcnacos"
  "log"

  emptypb "google.golang.org/protobuf/types/known/emptypb"
)

type Service struct {
  grpcnacos.UnimplementedTestServer
}

func (s Service) Test(ctx context.Context, empty *emptypb.Empty) (*grpcnacos.TestResponse, error) {
  log.Println("收到一个请求")
  return &grpcnacos.TestResponse{Msg: "test"}, nil
}

注册我们的服务

package main

import (
  "fmt"
  "learn/learn/Nacos/exp1/service"
  "learn/learn/Nacos/pkg/protocol/grpcnacos"
  "log"
  "net"

  "github.com/nacos-group/nacos-sdk-go/clients"
  "github.com/nacos-group/nacos-sdk-go/common/constant"
  "github.com/nacos-group/nacos-sdk-go/vo"
  "google.golang.org/grpc"
)

func main() {
  server := grpc.NewServer()
  service := service.Service{}
  grpcnacos.RegisterTestServer(server, service)
  port := GenFreePort()
  listen, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
  if err != nil {
    log.Fatalf("监听端口:%d失败: %s", port, err.Error())
  }
  // 创建serverConfig
  // 支持多个;至少一个ServerConfig
  serverConfig := []constant.ServerConfig{
    {
      IpAddr: "127.0.0.1",
      Port:   8848,
    },
  }

  // 创建clientConfig
  clientConfig := constant.ClientConfig{
    NamespaceId:         "", // 如果需要支持多namespace,我们可以场景多个client,它们有不同的NamespaceId。当namespace是public时,此处填空字符串。
    TimeoutMs:           50000,
    NotLoadCacheAtStart: true,
    LogLevel:            "debug",
  }

  // 创建服务发现客户端的另一种方式 (推荐)
  namingClient, err := clients.NewNamingClient(
    vo.NacosClientParam{
      ClientConfig:  &clientConfig,
      ServerConfigs: serverConfig,
    },
  )
  if err != nil {
    log.Fatalf("初始化nacos失败: %s", err.Error())
  }
  success, err := namingClient.RegisterInstance(vo.RegisterInstanceParam{
    Ip:          "127.0.0.1",
    Port:        uint64(port),
    ServiceName: "test-server",
    Weight:      10,
    Enable:      true,
    Healthy:     true,
    Ephemeral:   true,
    Metadata:    map[string]string{"name": "test"},
    ClusterName: "DEFAULT",       // 默认值DEFAULT
    GroupName:   "DEFAULT_GROUP", // 默认值DEFAULT_GROUP
  })
  if err != nil {
    log.Fatalf("注册服务失败: %s", err.Error())
  }

  log.Println("success: ", success)
  log.Printf("服务启动成功;PORT:%d\n", port)
  _ = server.Serve(listen)
}

// GenFreePort 获取一个空闲的端口;端口避免写死,因为要启动多个实例,测试负载均衡
func GenFreePort() int {
  addr, err := net.ResolveTCPAddr("tcp", "localhost:0")
  if err != nil {
    panic(err)
  }
  listen, err := net.ListenTCP("tcp", addr)
  if err != nil {
    panic(err)
  }
  defer listen.Close()
  return listen.Addr().(*net.TCPAddr).Port
}

通过名字获取服务的实力,请求获取结果

package main

import (
  "context"
  "fmt"
  "learn/learn/Nacos/pkg/protocol/grpcnacos"
  "log"

  "github.com/nacos-group/nacos-sdk-go/clients"
  "github.com/nacos-group/nacos-sdk-go/common/constant"
  "github.com/nacos-group/nacos-sdk-go/vo"
  "google.golang.org/grpc"
  "google.golang.org/grpc/credentials/insecure"
  "google.golang.org/protobuf/types/known/emptypb"
)

func main() {
  // 创建serverConfig
  // 支持多个;至少一个ServerConfig
  serverConfig := []constant.ServerConfig{
    {
      IpAddr: "127.0.0.1",
      Port:   8848,
    },
  }

  // 创建clientConfig
  clientConfig := constant.ClientConfig{
    // 如果需要支持多namespace,我们可以场景多个client,它们有不同的NamespaceId。当namespace是public时,此处填空字符串。
    NamespaceId:         "",
    TimeoutMs:           5000,
    NotLoadCacheAtStart: true,
    LogLevel:            "debug",
  }

  // 创建服务发现客户端的另一种方式 (推荐)
  namingClient, err := clients.NewNamingClient(
    vo.NacosClientParam{
      ClientConfig:  &clientConfig,
      ServerConfigs: serverConfig,
    },
  )
  if err != nil {
    log.Fatalf("初始化nacos失败: %s", err.Error())
  }

  // SelectOneHealthyInstance将会按加权随机轮询的负载均衡策略返回一个健康的实例
  // 实例必须满足的条件:health=true,enable=true and weight>0
  instance, err := namingClient.SelectOneHealthyInstance(vo.SelectOneHealthInstanceParam{
    ServiceName: "test-server",
    GroupName:   "DEFAULT_GROUP",     // 默认值DEFAULT_GROUP
    Clusters:    []string{"DEFAULT"}, // 默认值DEFAULT
  })
  log.Printf("获取到的实例IP:%s;端口:%d", instance.Ip, instance.Port)
  conn, err := grpc.Dial(fmt.Sprintf("%s:%d", instance.Ip, instance.Port), grpc.WithTransportCredentials(insecure.NewCredentials()))
  if err != nil {
    log.Fatalf("监听%s:%d失败:%s", instance.Ip, instance.Port, err.Error())
  }
  client := grpcnacos.NewTestClient(conn)
  res, err := client.Test(context.Background(), &emptypb.Empty{})
  if err != nil {
    log.Fatalf("调用TestClient失败: %s", err.Error())
  }
  log.Println(res.Msg)
}

至此我们完成了简单的服务注册和服务发现功能。测试下

% go run learn/Nacos/exp1/server/main.go
2022/11/04 00:04:38 success:  true
2022/11/04 00:04:38 服务启动成功;PORT:56358
2022/11/04 00:04:51 收到一个请求
 % go run learn/Nacos/exp1/client/main.go
2022/11/04 00:04:51 获取到的实例IP:127.0.0.1;端口:56358
2022/11/04 00:04:51 test

我们可以在页面上看下我们服务的注册情况http://127.0.0.1:8848/nacos/#/login用户名密码都是nacos

可以看到,不论是服务端注册还是客户端拉取,我们首先都需要初始化namingService的客户端,它需要两组参数

  namingClient, err := clients.NewNamingClient(
    vo.NacosClientParam{
      ClientConfig:  &clientConfig,
      ServerConfigs: serverConfig,
    },
  )

其中clientConfig配置了客户端,也就是我们的应用允许的超时时间等配置,serverConfigs是一组服务端的地址和端口后,也就是我们的nacos服务的地址,可以配置多个实例实现多活。

对于server端来说是通过RegisterInstance来实现服务的注册的

  success, err := namingClient.RegisterInstance(vo.RegisterInstanceParam{
    Ip:          "127.0.0.1",
    Port:        uint64(port),
    ServiceName: "test-server",
    Weight:      10,
    Enable:      true,
    Healthy:     true,
    Ephemeral:   true,
    Metadata:    map[string]string{"name": "test"},
    ClusterName: "DEFAULT",       // 默认值DEFAULT
    GroupName:   "DEFAULT_GROUP", // 默认值DEFAULT_GROUP
  })

客户端是通过SelectOneHealthyInstance将会按加权随机轮询的负载均衡策略返回一个健康的实例

  // 实例必须满足的条件:health=true,enable=true and weight>0
  instance, err := namingClient.SelectOneHealthyInstance(vo.SelectOneHealthInstanceParam{
    ServiceName: "test-server",
    GroupName:   "DEFAULT_GROUP",     // 默认值DEFAULT_GROUP
    Clusters:    []string{"DEFAULT"}, // 默认值DEFAULT
  })
  log.Printf("获取到的实例IP:%s;端口:%d", instance.Ip, instance.Port)

使用起来很简单方便又没有。下面分析下源码实现,注册参数定义如下

type RegisterInstanceParam struct {
  Ip          string            `param:"ip"`          //required
  Port        uint64            `param:"port"`        //required
  Weight      float64           `param:"weight"`      //required,it must be lager than 0
  Enable      bool              `param:"enabled"`     //required,the instance can be access or not
  Healthy     bool              `param:"healthy"`     //required,the instance is health or not
  Metadata    map[string]string `param:"metadata"`    //optional
  ClusterName string            `param:"clusterName"` //optional,default:DEFAULT
  ServiceName string            `param:"serviceName"` //required
  GroupName   string            `param:"groupName"`   //optional,default:DEFAULT_GROUP
  Ephemeral   bool              `param:"ephemeral"`   //optional
}

注册的时候先生成了服务的实例信息和心跳信息,然后请求nacos服务进行注册

type Instance struct {

  Valid       bool              `json:"valid"`

  Marked      bool              `json:"marked"`

  InstanceId  string            `json:"instanceId"`

  Port        uint64            `json:"port"`

  Ip          string            `json:"ip"`

  Weight      float64           `json:"weight"`

  Metadata    map[string]string `json:"metadata"`

  ClusterName string            `json:"clusterName"`

  ServiceName string            `json:"serviceName"`

  Enable      bool              `json:"enabled"`

  Healthy     bool              `json:"healthy"`

  Ephemeral   bool              `json:"ephemeral"`

}
type BeatInfo struct {

  Ip          string            `json:"ip"`

  Port        uint64            `json:"port"`

  Weight      float64           `json:"weight"`

  ServiceName string            `json:"serviceName"`

  Cluster     string            `json:"cluster"`

  Metadata    map[string]string `json:"metadata"`

  Scheduled   bool              `json:"scheduled"`

  Period      time.Duration     `json:"-"`

  State       int32             `json:"-"`

}

具体动作的执行是通过我们初始化naming客户端的时候指定的proxy agent执行的,默认的agent是一个httpagent

func (sc *NamingClient) RegisterInstance(param vo.RegisterInstanceParam) (bool, error) 
    _, err := sc.serviceProxy.RegisterInstance(util.GetGroupName(param.ServiceName, param.GroupName), param.GroupName, instance)
    sc.beatReactor.AddBeatInfo(util.GetGroupName(param.ServiceName, param.GroupName), beatInfo)

其中注册实例是直接调用的我们前面提到的服务注册的http接口

return proxy.nacosServer.ReqApi(constant.SERVICE_PATH, params, http.MethodPost)
    SERVICE_BASE_PATH           = "/v1/ns"
    SERVICE_PATH                = SERVICE_BASE_PATH + "/instance"

发送心跳是单独启用了一个协程

  go br.sendInstanceBeat(k, beatInfo)

如果当前实例注销,则进行停止心跳,否则进行心跳通信

beatInterval, err := br.serviceProxy.SendBeat(beatInfo)
api := constant.SERVICE_BASE_PATH + "/instance/beat"
result, err := proxy.nacosServer.ReqApi(api, params, http.MethodPut)

具体调用的是

SERVICE_BASE_PATH           = "/v1/ns"
result, err = server.callServer(api, params, method, getAddress(curServer), curServer.ContextPath)

最终调用的agent实现位于 github.com/nacos-group/nacos-sdk-go@v1.1.2/common/http_agent/http_agent.go

type HttpAgent struct {
}  
func (agent *HttpAgent) Get
      get(path, header, timeoutMs, params)  
func (agent *HttpAgent) RequestOnlyResult
      agent.Get
      agent.Post
      agent.Put
      agent.Delete
      bytes, errRead := ioutil.ReadAll(response.Body)
    func (agent *HttpAgent) Request
      agent.Get
      agent.Post
      agent.Put
      agent.Delete

其中get实现如下

func get(path string, header http.Header, timeoutMs uint64, params map[string]string) (response *http.Response, err error) 
  client := http.Client{}
  resp, errDo := client.Do(request)

客户端采用随机策略选取一个实例

func (sc *NamingClient) SelectOneHealthyInstance(param vo.SelectOneHealthInstanceParam) (*model.Instance, error)
  service, err := sc.hostReactor.GetServiceInfo(util.GetGroupName(param.ServiceName, param.GroupName), strings.Join(param.Clusters, ","))
  return serviceName + constant.SERVICE_INFO_SPLITER + clusters

其中

  SERVICE_INFO_SPLITER        = "@@"

通过list方法获取服务列表

    cacheService, ok := hr.serviceInfoMap.Get(key)
    hr.updateServiceNow(serviceName, clusters)
      result, err := hr.serviceProxy.QueryList(serviceName, clusters, hr.pushReceiver.port, false)
        api := constant.SERVICE_PATH + "/list"
  return proxy.nacosServer.ReqApi(api, param, http.MethodGet)

然后解析json

  SERVICE_BASE_PATH           = "/v1/ns"
  SERVICE_PATH                = SERVICE_BASE_PATH + "/instance"
  hr.ProcessServiceJson(result)

获取到实例列表后,就通过随机算法选取一个活着的节点

return sc.selectOneHealthyInstances(service)
      for _, host := range hosts {
    if host.Healthy && host.Enable && host.Weight > 0 {
      cw := int(math.Ceil(host.Weight))
      if cw > mw {
        mw = cw
      }
      result = append(result, host)
    chooser := newChooser(result)
    instance := chooser.pick()

其中选择器定义:

sort.Sort(instance(instances))
      return Chooser{data: instances, totals: totals, max: runningTotal}

选择算法实现:

instance := chooser.pick()
  r := rand.Intn(chs.max) + 1
  i := sort.SearchInts(chs.totals, r)
  return chs.data[i]