zl程序教程

您现在的位置是:首页 >  Java

当前栏目

图解K8s源码 - kube-apiserver篇

2023-02-18 16:23:54 时间

在进入组件源码分析前我们先来看下在k8s中创建一个Pod资源对象的流程是怎样的:

  1. 使用 kubectl 工具向 kube-apiserver 发起创建 pod 资源对象请求;
  2. kube-apiserver 验证请求并将其持久化保存到 Etcd 集群中;
  3. kube-apiserver 基于 Watch 机制通知 kube-scheduler 调度器;
  4. kube-scheduler 调度器根据预先和优先调度算法为 pod 资源对象选择最优节点并通知 kube-apiserver;
  5. kube-apiserver 将最优节点持久化保存到 Etcd 集群中;
  6. kube-apiserver 通知最优节点上的 kubelet 组件;
  7. kubelet 组件在所在节点上通过与容器进程交互创建容器;
  8. kubelet 组件将容器状态上报至 kube-apiserver;
  9. kube-apiserver 将容器状态持久化保存到 Etcd 集群中。

kube-apiserver

kube-apiserver 组件,它负责将 k8s 中的“资源组/资源版本/资源”以 RESTful 风格的形式对外暴露并提供服务。k8s 集群中的所有组件都通过kube-apiserver组件操作资源对象。kube-apiserver 属于核心组件,对于整个集群至关重要,它具有以下重要特性:

  • 将 k8s 系统中的所有资源对象都封装成 RESTful 风格的API接口进行管理
  • 可进行集群状态管理和数据管理,是唯一与 Etcd 集群交互的组件。
  • 拥有丰富的集群安全访问机制,以及认证、授权及准入控制器。
  • 提供了集群各组件的通信和交互功能。

kube-apiserver 中的组件

kube-apiserver 提供了3种 HTTP Server 服务用于将 kube-apiserver组件功能进行解耦,分别是 APIExtensionsSerer、KubeAPIServer、AggregatorServe。它们可以通过 kubectl 工具或接口进行资源管理,架构如下图:

  • APIExtensionsSerer:提供API扩展服务,开发者可以对CRD (CustomResourceDefinitions)自定义资源对象和 CR(CustomResource)进行管理,并通过资源注册表管理CRD相关资源。它是 Delegation 的最后一环,如果对应 CR 不能被处理的话则会返回 404。
  • KubeAPIServer:提供API核心服务,API核心服务通过 Master 对象进行管理,并通过资源注册表管理 Master 相关资源。它负责对请求的一些通用处理,认证、鉴权等,以及处理各个内建资源的 REST 服务。
  • AggregatorServe:提供AA (APIAggregator)聚合服务,API聚合服务通过 APIAggregator 对象进行管理,并通过资源注册表管理AA相关资源。它的暴露的功能类似于一个七层负载均衡,将来自用户的请求拦截转发给其他服务器,并且负责整个 APIServer 的 Discovery 功能。

Aggregator 和 APIExtensionsServer 对应两种主要扩展 APIServer 资源的方式,即分别是 AA 和 CRD。

kube-apiserver启动流程

在 kube-apiserver 组件启动过程中首先是将k8s所支持的资源注册到Scheme资源注册表中这样后面启动的逻辑才能够从Scheme资源注册表中拿到资源信息并启动和运行 APIExtensionsServer、KubeAPIServer、AggregatorServer 这3种服务。

然后是通过 Cobra 命令行进行参数解析, k8s 中所有组件都使用 Cobra 库来解析命令行参数。kube-apiserver 组件通过 Cobra 填充配置参数默认值并验证参数。

在 cmd/kube-apiserver/app/server.go 下的 Run 方法启动主逻辑。Run 方法中首先调用了 CreateServerChain 构建服务调用链,然后执行 server.PrepareRun()。

func Run(completeOptions completedServerRunOptions, stopCh <-chan struct{}) error {
  // To help debugging, immediately log version
    klog.Infof("Version: %+v", version.Get())

    server, err := CreateServerChain(completeOptions, stopCh)
    if err != nil {
        return err
    }

    prepared, err := server.PrepareRun()
    if err != nil {
        return err
    }

    return prepared.Run(stopCh)
}

调用链中的内容代码如下:

func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan struct{}) (*aggregatorapiserver.APIAggregator, error) {
    // 1、为 kubeAPIServer 创建配置
    kubeAPIServerConfig, serviceResolver, pluginInitializer, err := CreateKubeAPIServerConfig(completedOptions)
    if err != nil {
      return nil, err
    }

    // 2、判断是否配置了 APIExtensionsServer,创建 apiExtensionsConfig 
    apiExtensionsConfig, err := createAPIExtensionsConfig(*kubeAPIServerConfig.GenericConfig, kubeAPIServerConfig.ExtraConfig.VersionedInformers, pluginInitializer, completedOptions.ServerRunOptions, completedOptions.MasterCount,
      serviceResolver, webhook.NewDefaultAuthenticationInfoResolverWrapper(kubeAPIServerConfig.ExtraConfig.ProxyTransport, kubeAPIServerConfig.GenericConfig.EgressSelector, kubeAPIServerConfig.GenericConfig.LoopbackClientConfig, kubeAPIServerConfig.GenericConfig.TracerProvider))
    if err != nil {
      return nil, err
    }
    
    // 3、初始化 APIExtensionsServer
    notFoundHandler := notfoundhandler.New(kubeAPIServerConfig.GenericConfig.Serializer, genericapifilters.NoMuxAndDiscoveryIncompleteKey)
    apiExtensionsServer, err := createAPIExtensionsServer(apiExtensionsConfig, genericapiserver.NewEmptyDelegateWithCustomHandler(notFoundHandler))
    if err != nil {
      return nil, err
    }

    // 4、初始化 KubeAPIServer
    kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer)
    if err != nil {
      return nil, err
    }

    // 5、创建 AggregatorConfig
    aggregatorConfig, err := createAggregatorConfig(*kubeAPIServerConfig.GenericConfig, completedOptions.ServerRunOptions, kubeAPIServerConfig.ExtraConfig.VersionedInformers, serviceResolver, kubeAPIServerConfig.ExtraConfig.ProxyTransport, pluginInitializer)
    if err != nil {
      return nil, err
    }
    
    // 6、初始化 AggregatorServer
    aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, apiExtensionsServer.Informers)
    if err != nil {
      return nil, err
    }

    return aggregatorServer, nil

该方法实际上完成了server初始化,里面包含APIExtensionsServer、KubeAPIServer、AggregatorServer三个server,它们通过委托模式连接在一起的,初始化过程都是类似的,首先为每个server创建对应的config,然后初始化 http server,最终返回 aggregatorapiserver.APIAggregator 实例。初始化流程主要有:http filter chain 的配置、API Group 的注册、http path与handler 的关联以及 handler 后端存储 etcd 的配置。

我将上述 CreateServerChain 构建服务调用链方法做成了流程图,见下图:

server.PrepareRun()主要完成了健康检查、存活检查和OpenAPI路由的注册工作,最后调用 prepared.Run 方法来启动安全的 http server。

kube-apiserver 权限控制及认证

kube-apiserver 作为k8s集群请求入口,接收集群中组件和客户端访问请求, kube-apiserver 对接口访问请求提供了3种安全权限控制,每个请求都需要经过认证、授权及准入控制器才有权限操作资源对象。

认证

k8s支持同时开启多个认证功能,目前 kube-apiserver 提供了9种认证机制,每一种机制被实例化后成为认证器(Authenticator),认证器被封装在 http.Handler 请求处理函数中。当客户端请求通过任一认证器并返回true时,则表示认证通过。

源码位置:vendor/k8s.io/apiserver/pkg/endpoints/filters/authentication.go 45

func withAuthentication(handler http.Handler, auth authenticator.Request, failed http.Handler, apiAuds authenticator.Audiences, metrics recordMetrics) http.Handler {
    if auth == nil {
      klog.Warning("Authentication is disabled")
      return handler
    }
    return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
        ...
        resp, ok, err := auth.AuthenticateRequest(req)
        ...
        if err != nil || !ok {
          ...
          failed.ServeHTTP(w, req)
          return
        }

       ...
        // authorization header is not required anymore in case of a successful authentication.
        req.Header.Del("Authorization")

        req = req.WithContext(genericapirequest.WithUser(req.Context(), resp.User))
        handler.ServeHTTP(w, req)
      })
}
  • RequestHeader:代理客户端发送的认证请求,使用请求头将用户名和组信息发送给kube-apiserver,当客户端发送认证请求时,kube-apiserver根据Header Values中的用户名列表来识别用户。
  • BasicAuth:基于HTTP协议的认证机制,客户端将用户名、密码写入请求头,认证时HTTP服务端从请求头通过base64解码出用户及密码信息,从而实现身份认证。
  • ClientCA:也称为TLS双向认证,即服务端与客户端互相验证CA签字过的证书的正确性。
  • TokenAuth :基于客户端向服务端提供的Token的认证,Token一般是一个字符串。
  • BootstrapToken :是一种引导Token的机制,客户端的Token信息与服务端的Token相匹配,则认证通过,自动为节点颁发证书,通过paseToken函数解析出Token ID和Token Secret,验证Token Secret中的Expire(过期)、Data、Type等。
  • WebhookTokenAuth:也被称为钩子,是一种基于HTTP协议的回调机制,当客户端将认证请求发到kube-apiserver时,kube-apiserver回调钩子方法,将验证信息发送给远程 webhook 服务器进行认证,根据webhook 服务器返回的状态码判断是否验证成功。
  • Anonymous:未被其他认证器拒绝的请求都可视为匿名请求。
  • OIDC:(OpenID Connect)基于OAuth 2.0协议的轻量级认证规范,它提供了通过API进行身份交互的框架。它的认证流程简言之就是用户先到身份提供商通过认证服务认证自己,拿到access_token、id_token和refresh_token;用户通过指定token参数将其写入 kubeconfig 文件中通过kubectl发到kube-apiserver;kube-apiserver检查配置文件中的证书确保JWT签名有效、id_token未过期等后通过验证。
  • ServiceAccountAuth:也被称为服务账户令牌,与其他认证机制不同,该机制从pod资源内部访问kube-apiserver,提供给运行在pod资源中的进程使用,为Pod资源中的进程提供必要的身份证明,从而获取集群的信息。ServiceAccountAuth认证通过Kubernetes资源的Service Account实现。

这里稍微聊下Kubernetes的两类用户:

  • 普通用户:普通用户,一般由外部独立服务管理,前面介绍的认证机制(如BasicAuth、OIDC认证等)都属于普通用户,Kubernetes没有为这类用户设置用户对象。
  • 服务账户:服务账户,是由Kubernetes API Server管理的用户,它们被绑定到指定的命名空间,由Kubernetes API Server自动或手动创建。Service Account是为了Pod资源中的进程方便与Kubernetes API Server进行通信而设置的。

参考:

《Kubernetes源码剖析》

https://kubernetes.io/zh-cn/docs/concepts/overview/kubernetes-api/

https://blog.tianfeiyu.com/source-code-reading-notes/kubernetes/kube_apiserver.html#createkubeapiserverconfig

END