zl程序教程

您现在的位置是:首页 >  工具

当前栏目

容器能不能将 volume 挂载直接挂到根目录?—— 浅析 kubelet 到 runc 的调用过程

容器 过程 调用 不能 浅析 挂载 直接 根目录
2023-06-13 09:18:10 时间

引子

这件事起源于有小伙伴在某群里问,在 K8s 中,能不能把 volume 挂载直接挂到根目录?我的第一反应是不能。容器会使用 union filesystem 将容器的内容挂到根目录下,这点在正常情况下是无法更改的。但是就止于此吗?发现给不出合理解释的时候,突然感觉自己对于容器的认知只停留在了很表面的阶段。

首先通过我们前面的分析,OCI 运行时实际上是允许我们随便定义根目录挂载的。而且在实操中,也确实有过使本地一个随意目录作为 rootfs 的经历。

这里在提一嘴,实际上无论 K8s (Kubelet)还是 containerd,都没有限制根目录的挂载。甚至 containerd 直接 ctr --rootfs 就可以指定 rootfs 目录。那如果在 K8s 下呢?本文我们将会简单了解一下从 kubelet 到 runc 的调用链路。

1、Kubelet 如何创建容器

1.1 Kubelet 的事件循环

Kubelet 的中枢是一个大事件循环,这里面与本期最相关的就是 configChhandler,一个负责收 pod 配置,一个负责根据配置干活。其他的几个 channel 分别是对 pod 进行定期同步、清理 pod、pod 生命周期处理,我们先只专注于 pod 的创建:

// syncLoopIteration reads from various channels and dispatches pods to the
// given handler.
//
// Arguments:
// 1.  configCh:       a channel to read config events from
// 2.  handler:        the SyncHandler to dispatch pods to
// 3.  syncCh:         a channel to read periodic sync events from
// 4.  housekeepingCh: a channel to read housekeeping events from
// 5.  plegCh:         a channel to read PLEG updates from
//
// Events are also read from the kubelet liveness manager's update channel.
//
// The workflow is to read from one of the channels, handle that event, and
// update the timestamp in the sync loop monitor.
//
// Here is an appropriate place to note that despite the syntactical
// similarity to the switch statement, the case statements in a select are
// evaluated in a pseudorandom order if there are multiple channels ready to
// read from when the select is evaluated.  In other words, case statements
// are evaluated in random order, and you can not assume that the case
// statements evaluate in order if multiple channels have events.
//
// With that in mind, in truly no particular order, the different channels
// are handled as follows:
//
//   - configCh: dispatch the pods for the config change to the appropriate
//     handler callback for the event type
//   - plegCh: update the runtime cache; sync pod
//   - syncCh: sync all pods waiting for sync
//   - housekeepingCh: trigger cleanup of pods
//   - health manager: sync pods that have failed or in which one or more
//     containers have failed health checks
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
	syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
	select {
	case u, open := <-configCh:
		// Update from a config source; dispatch it to the right handler
		// callback.
		if !open {
			klog.ErrorS(nil, "Update channel is closed, exiting the sync loop")
			return false
		}

		switch u.Op {
		case kubetypes.ADD:
			klog.V(2).InfoS("SyncLoop ADD", "source", u.Source, "pods", klog.KObjSlice(u.Pods))
			// After restarting, kubelet will get all existing pods through
			// ADD as if they are new pods. These pods will then go through the
			// admission process and *may* be rejected. This can be resolved
			// once we have checkpointing.
			handler.HandlePodAdditions(u.Pods)
		case kubetypes.UPDATE:
			klog.V(2).InfoS("SyncLoop UPDATE", "source", u.Source, "pods", klog.KObjSlice(u.Pods))
			handler.HandlePodUpdates(u.Pods)
		case kubetypes.REMOVE:
			klog.V(2).InfoS("SyncLoop REMOVE", "source", u.Source, "pods", klog.KObjSlice(u.Pods))
			handler.HandlePodRemoves(u.Pods)
		case kubetypes.RECONCILE:
			klog.V(4).InfoS("SyncLoop RECONCILE", "source", u.Source, "pods", klog.KObjSlice(u.Pods))
			handler.HandlePodReconcile(u.Pods)
		case kubetypes.DELETE:
			klog.V(2).InfoS("SyncLoop DELETE", "source", u.Source, "pods", klog.KObjSlice(u.Pods))
			// DELETE is treated as a UPDATE because of graceful deletion.
			handler.HandlePodUpdates(u.Pods)
		case kubetypes.SET:
			// TODO: Do we want to support this?
			klog.ErrorS(nil, "Kubelet does not support snapshot update")
		default:
			klog.ErrorS(nil, "Invalid operation type received", "operation", u.Op)
		}

		kl.sourcesReady.AddSource(u.Source)

	case e := <-plegCh:
	  ...
	case <-syncCh:
	  ...
	case update := <-kl.livenessManager.Updates():
		  ...
	case update := <-kl.readinessManager.Updates():
	  ...
	case update := <-kl.startupManager.Updates():
		...
	case <-housekeepingCh:
	  ...
	return true
}

这个 handler 接口的实现其实就是 kubelet 自己,各种 handler 的实现实际上是委托给了 podWorker:

// HandlePodAdditions is the callback in SyncHandler for pods being added from
// a config source.
func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
	
  ...
  
	kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
}

// dispatchWork starts the asynchronous sync of the pod in a pod worker.
// If the pod has completed termination, dispatchWork will perform no action.
func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
	// Run the sync in an async worker.
	kl.podWorkers.UpdatePod(UpdatePodOptions{
		Pod:        pod,
		MirrorPod:  mirrorPod,
		UpdateType: syncType,
		StartTime:  start,
	})
	// Note the number of containers for new pods.
	if syncType == kubetypes.SyncPodCreate {
		metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers)))
	}
}

PodWorker.UpdatePod 这个代码很长,我们直接关注创建 pod 的部分,如果是创建 Pod 的请求,它会交给 managePodLoop 方法处理,最后委托给 PodWorker.syncPodFn,这个 syncPodFn 又回到了 kubelet 的 syncPod,说起来很绕,实际上就是 kubelet 这个类把 pod 同步过程中的一些分支判断、事件生成啥的外包给了 PodWorker,PodWorker 实际上还蛮复杂的,不过我们这里不分析这么长一坨代码,只看下 PodWorker 的定义,这里面有提到在生命周期里面都干了些啥:

// ---|                                         = kubelet config has synced at least once
// -------|                                  |- = pod exists in apiserver config
// --------|                  |---------------- = CouldHaveRunningContainers() is true
//
//	^- pod is observed by pod worker  .
//	.                                 .
//
// ----------|       |------------------------- = syncPod is running
//
//	. ^- pod worker loop sees change and invokes syncPod
//	. .                               .
//
// --------------|                     |------- = ShouldPodContainersBeTerminating() returns true
// --------------|                     |------- = IsPodTerminationRequested() returns true (pod is known)
//
//	. .   ^- Kubelet evicts pod       .
//	. .                               .
//
// -------------------|       |---------------- = syncTerminatingPod runs then exits without error
//
//	        . .        ^ pod worker loop exits syncPod, sees pod is terminating,
//					 . .          invokes syncTerminatingPod
//	        . .                               .
//
// ---|    |------------------|              .  = ShouldPodRuntimeBeRemoved() returns true (post-sync)
//
//	.                ^ syncTerminatingPod has exited successfully
//	.                               .
//
// ----------------------------|       |------- = syncTerminatedPod runs then exits without error
//
//	.                         ^ other loops can tear down
//	.                               .
//
// ------------------------------------|  |---- = status manager is waiting for PodResourcesAreReclaimed()
//
//	.                         ^     .
//
// ----------|                               |- = status manager can be writing pod status
//
//	^ status manager deletes pod because no longer exists in config
//

那么这些事件是从哪里来的呢?其实和 informer 很像,不过它这里用到的是更底层的 Reflector(Reflector 只是其中一种来源,其实它还有其他 pod 变更来源),我们知道 Reflector 是起了一个 listAndWatch,然后把变更扔进一个 Channel 里,有一个协程不断在消费 watch 到的 Pod,然后推到 configCh 里面,这部分感兴趣的可以去看看 podStorage,所以整体的流程大体如下:

1.2 kubelet 与 CRI

Kubelet 中负责 container 管理的成员是 kubeGenericRuntimeManager,也就是 kubelet 中 syncPod 中主要干活的对象:

// SyncPod syncs the running pod into the desired pod by executing following steps:
//
//  1. Compute sandbox and container changes.
//  2. Kill pod sandbox if necessary.
//  3. Kill any containers that should not be running.
//  4. Create sandbox if necessary.
//  5. Create ephemeral containers.
//  6. Create init containers.
//  7. Create normal containers.
func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {
}

这里会根据进行真正的容器(CRI)操作,包括 sanbox 管理(暂且可以理解为 pause 容器,可以给其他容器共享部分 ns),ephemeral、init 以及 normal 容器。

除了 sandbox 以外,其他容器的运行流程都是一样的,在 syncPod 中,有一段匿名函数,就负责容器创建:

  // Helper containing boilerplate common to starting all types of containers.
	// typeName is a description used to describe this type of container in log messages,
	// currently: "container", "init container" or "ephemeral container"
	// metricLabel is the label used to describe this type of container in monitoring metrics.
	// currently: "container", "init_container" or "ephemeral_container"
	start := func(typeName, metricLabel string, spec *startSpec) error {
		startContainerResult := kubecontainer.NewSyncResult(kubecontainer.StartContainer, spec.container.Name)
		result.AddSyncResult(startContainerResult)

		isInBackOff, msg, err := m.doBackOff(pod, spec.container, podStatus, backOff)
		if isInBackOff {
			startContainerResult.Fail(err, msg)
			klog.V(4).InfoS("Backing Off restarting container in pod", "containerType", typeName, "container", spec.container, "pod", klog.KObj(pod))
			return err
		}

    ...
    
		// NOTE (aramase) podIPs are populated for single stack and dual stack clusters. Send only podIPs.
		if msg, err := m.startContainer(podSandboxID, podSandboxConfig, spec, pod, podStatus, pullSecrets, podIP, podIPs); err != nil {
      
		...
      
			}
			return err
		}

		return nil
	}

在这里能看到我们熟悉的 K8s 报错:Backing Off restarting container in pod,其实就是 mark 一个时间,来防止起不来的容器频繁进入容器的创建。

匿名函数里真正干活的是这个叫做 startContainer 的方法,我们浅析一下这里干了啥事(删减掉了容器事件、异常处理相关代码):

// startContainer starts a container and returns a message indicates why it is failed on error.
// It starts the container through the following steps:
// * pull the image
// * create the container
// * start the container
// * run the post start lifecycle hooks (if applicable)
func (m *kubeGenericRuntimeManager) startContainer(podSandboxID string, podSandboxConfig *runtimeapi.PodSandboxConfig, spec *startSpec, pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, podIP string, podIPs []string) (string, error) {
	container := spec.container

	// Step 1: pull the image.
	imageRef, msg, err := m.imagePuller.EnsureImageExists(pod, container, pullSecrets, podSandboxConfig)
  ...

	// Step 2: create the container.
	containerConfig, cleanupAction, err := m.generateContainerConfig(container, pod, restartCount, podIP, imageRef, podIPs, target)

	containerID, err := m.runtimeService.CreateContainer(podSandboxID, containerConfig, podSandboxConfig)

	// Step 3: start the container.
	err = m.runtimeService.StartContainer(containerID)

	// Symlink container logs to the legacy container log location for cluster logging
	// support.
	// TODO(random-liu): Remove this after cluster logging supports CRI container log path.
	containerMeta := containerConfig.GetMetadata()
	sandboxMeta := podSandboxConfig.GetMetadata()
	legacySymlink := legacyLogSymlink(containerID, containerMeta.Name, sandboxMeta.Name,
		sandboxMeta.Namespace)
	containerLog := filepath.Join(podSandboxConfig.LogDirectory, containerConfig.LogPath)
	// only create legacy symlink if containerLog path exists (or the error is not IsNotExist).
	// Because if containerLog path does not exist, only dangling legacySymlink is created.
	// This dangling legacySymlink is later removed by container gc, so it does not make sense
	// to create it in the first place. it happens when journald logging driver is used with docker.
	if _, err := m.osInterface.Stat(containerLog); !os.IsNotExist(err) {
		if err := m.osInterface.Symlink(containerLog, legacySymlink); err != nil {
			klog.ErrorS(err, "Failed to create legacy symbolic link", "path", legacySymlink,
				"containerID", containerID, "containerLogPath", containerLog)
		}
	}

	// Step 4: execute the post start hook.
	if container.Lifecycle != nil && container.Lifecycle.PostStart != nil {
		kubeContainerID := kubecontainer.ContainerID{
			Type: m.runtimeName,
			ID:   containerID,
		}
		msg, handlerErr := m.runner.Run(kubeContainerID, pod, container, container.Lifecycle.PostStart)
	}

	return "", nil
}

这里要干的几个事情,其实方法头部已经讲的很清楚了:

  • 先是检查 Image 是否存在,不存在的话会进行拉取;
  • 生成 CRI 配置,也就是 containerConfig, cleanupAction, err := m.generateContainerConfig,并 m.runtimeService.CreateContainer 创建容器
  • m.runtimeService.StartContainer 运行容器,并给日志挂软链(比如一般我们都挂到 /var/log/pods 底下,方便 promtail 之类的去采集)
  • 进行 postStart

这里和平常使用 K8s 相关最密切的就是 Pod 配置是如何翻译成 CRI 配置的 (m.generateContainerConfig),比如 Env 相关: EnvFrom Secret、Cm,fieldRef (downwardAPI) 之类的;再比如 hostPath、emptyDir 之类的是怎么对应到 Host 上的目录的,简单来看类似这样:

// generateContainerConfig generates container config for kubelet runtime v1.
func (m *kubeGenericRuntimeManager) generateContainerConfig(container *v1.Container, pod *v1.Pod, restartCount int, podIP, imageRef string, podIPs []string, nsTarget *kubecontainer.ContainerID) (*runtimeapi.ContainerConfig, func(), error) {
	...
  
	config := &runtimeapi.ContainerConfig{
		Metadata: &runtimeapi.ContainerMetadata{
			Name:    container.Name,
			Attempt: restartCountUint32,
		},
		Image:       &runtimeapi.ImageSpec{Image: imageRef},
		Command:     command,
		Args:        args,
		WorkingDir:  container.WorkingDir,
		Labels:      newContainerLabels(container, pod),
		Annotations: newContainerAnnotations(container, pod, restartCount, opts),
		Devices:     makeDevices(opts),
		Mounts:      m.makeMounts(opts, container),
		LogPath:     containerLogsPath,
		Stdin:       container.Stdin,
		StdinOnce:   container.StdinOnce,
		Tty:         container.TTY,
	}
  
  ...
  
	return config, cleanupAction, nil
}

1.3 创建容器的 CRI

这里再往下,终于轮到 CRI 登场了,Kubelet 通过 gRPC 将刚才生成的 ContainerConfig,传递给容器运行时,我们这里接下来将以 containerd 为例,继续旅途。

CRI {
  // CreateContainer creates a new container in specified PodSandbox
	CreateContainer(context.Context, *CreateContainerRequest) (*CreateContainerResponse, error)
	// StartContainer starts the container.
	StartContainer(context.Context, *StartContainerRequest) (*StartContainerResponse, error)
}

type CreateContainerRequest struct {
	// ID of the PodSandbox in which the container should be created.
	PodSandboxId string `protobuf:"bytes,1,opt,name=pod_sandbox_id,json=podSandboxId,proto3" json:"pod_sandbox_id,omitempty"`
	// Config of the container.
	Config *ContainerConfig `protobuf:"bytes,2,opt,name=config,proto3" json:"config,omitempty"`
	// Config of the PodSandbox. This is the same config that was passed
	// to RunPodSandboxRequest to create the PodSandbox. It is passed again
	// here just for easy reference. The PodSandboxConfig is immutable and
	// remains the same throughout the lifetime of the pod.
	SandboxConfig        *PodSandboxConfig `protobuf:"bytes,3,opt,name=sandbox_config,json=sandboxConfig,proto3" json:"sandbox_config,omitempty"`
	XXX_NoUnkeyedLiteral struct{}          `json:"-"`
	XXX_sizecache        int32             `json:"-"`
}

简单说下 Containerd 的实现,目前 Containerd 的 CRI 实现有两套,有一套新的叫 sandbox,相关可以看 Sandbox API work。这里提到 sandbox 在之前只是一个概念,非一等公民。但这个概念在容器变得更加重要,且更加清晰 —— sandbox 是一个用于运行多容器的隔离环境,所以有了这套新的实现。

简单来说就是它是 container 更高一层的实体,最好理解的例子就是 Pod。大佬觉得现在可能每个人有自己的一套类似的实现,不如搞一套标准的:

不过这和我们文章主旨没太大的关系,毕竟我们目前想探究的是容器的 rootfs 挂载问题。

1.4 容器的 “创建”

代码在这里,太长就不贴全部了,CRI 中创建容器实际上是为容器做预准备,并没有真正运行这个容器。这里代码本质就是根据配置来组装 opts 责任链来初始化 Container 对象,Container 对象可以说几乎包含了运行容器的所有上下文。

// -------- create_container.go
var cntr containerd.Container
	if cntr, err = c.client.NewContainer(ctx, id, opts...); err != nil {
		return nil, fmt.Errorf("failed to create containerd container: %w", err)
	}
// --------

// NewContainer will create a new container with the provided id.
// The id must be unique within the namespace.
func (c *Client) NewContainer(ctx context.Context, id string, opts ...NewContainerOpts) (Container, error) {
	ctx, done, err := c.WithLease(ctx)
	if err != nil {
		return nil, err
	}
	defer done(ctx)

	container := containers.Container{
		ID: id,
		Runtime: containers.RuntimeInfo{
			Name: c.runtime,
		},
	}
	for _, o := range opts {
		if err := o(ctx, c, &container); err != nil {
			return nil, err
		}
	}
	r, err := c.ContainerService().Create(ctx, container)
	if err != nil {
		return nil, err
	}
	return containerFromRecord(c, r), nil
}

比如 Containerd 有一个很重要的组件叫 snapshotter,也是这个责任链里初始化 opts 的一环,它将负责准备容器的 rootfs。

// -------- create_container.go
  // Grab any platform specific snapshotter opts.
	sOpts := snapshotterOpts(c.config.ContainerdConfig.Snapshotter, config)

  // Set snapshotter before any other options.
	opts := []containerd.NewContainerOpts{
		containerd.WithSnapshotter(c.runtimeSnapshotter(ctx, ociRuntime)),
		// Prepare container rootfs. This is always writeable even if
		// the container wants a readonly rootfs since we want to give
		// the runtime (runc) a chance to modify (e.g. to create mount
		// points corresponding to spec.Mounts) before making the
		// rootfs readonly (requested by spec.Root.Readonly).
		customopts.WithNewSnapshot(id, containerdImage, sOpts...),
	}
// --------

在初始化完毕 Container 对象后,会经由 containerd 的一个 Kv 存储(boltDB)持久化到磁盘。层级关系如:<version>/<namespace>/<object>/<key> -> <field>,每个层级是一个可以通过 key 定位到的 bucket 对象,最底层的 bucket 逐个按照 key 存储 container 的 field。大概长这样:

// Notes:
//
//   - `╘══*...*` refers to maps with arbitrary keys
//
//   - `version` is a key to a numeric value identifying the minor revisions
//     of schema version
//
//   - a namespace in a schema bucket cannot be named "version"
//
//     └──v1                                        - Schema version bucket
//     ├──version : <varint>                     - Latest version, see migrations
//     ╘══*namespace*
//     ├──containers
//     │  ╘══*container id*
//     │     ├──createdat : <binary time>     - Created at
//     │     ├──updatedat : <binary time>     - Updated at
//     │     ├──spec : <binary>               - Proto marshaled spec
//     │     ├──image : <string>              - Image name
//     │     ├──snapshotter : <string>        - Snapshotter name
//     │     ├──snapshotKey : <string>        - Snapshot key
//     │     ├──runtime
//     │     │  ├──name : <string>            - Runtime name
//     │     │  ├──extensions
//     │     │  │  ╘══*name* : <binary>       - Proto marshaled extension
//     │     │  └──options : <binary>         - Proto marshaled options
//     │     └──labels
//     │        ╘══*key* : <string>           - Label value

1.5 容器的运行

在初始化(创建) Container 完毕以后,启动的 gRPC 参数则简单很多,只需要提供一个容器 id 即可:

type StartContainerRequest struct {
	// ID of the container to start.
	ContainerId          string   `protobuf:"bytes,1,opt,name=container_id,json=containerId,proto3" json:"container_id,omitempty"`
	XXX_NoUnkeyedLiteral struct{} `json:"-"`
	XXX_sizecache        int32    `json:"-"`
}

有了容器 id,Containerd 就可以从本地数据库里面取出之前配置好的 Container 等对象。整体的流程简化后如下,containerd 将其交给了一个叫 task 的对象去运行:

// StartContainer starts the container.
func (c *criService) StartContainer(ctx context.Context, r *runtime.StartContainerRequest) (retRes *runtime.StartContainerResponse, retErr error) {
	cntr, err := c.containerStore.Get(r.GetContainerId())
	container := cntr.Container

	ctrInfo, err := container.Info(ctx)
	ociRuntime, err := c.getSandboxRuntime(sandbox.Config, sandbox.Metadata.RuntimeHandler)

	taskOpts := c.taskOpts(ctrInfo.Runtime.Name)
	if ociRuntime.Path != "" {
		taskOpts = append(taskOpts, containerd.WithRuntimePath(ociRuntime.Path))
	}
	task, err := container.NewTask(ctx, ioCreation, taskOpts...)

	// wait is a long running background request, no timeout needed.
	exitCh, err := task.Wait(ctrdutil.NamespacedContext())

	// Start containerd task.
	if err := task.Start(ctx); err != nil {
		return nil, fmt.Errorf("failed to start containerd task %q: %w", id, err)
	}

	return &runtime.StartContainerResponse{}, nil
}

这里会调用 snapshotter.Mounts (ctx, r.SnapshotKey) 获取之前配置好的 rootfs spec,作为预设好的参数传给创建 task 的请求。

func (c *container) NewTask(ctx context.Context, ioCreate cio.Creator, opts ...NewTaskOpts) (_ Task, err error) {
	...
	if r.SnapshotKey != "" {
		if r.Snapshotter == "" {
			return nil, fmt.Errorf("unable to resolve rootfs mounts without snapshotter on container: %w", errdefs.ErrInvalidArgument)
		}

		// get the rootfs from the snapshotter and add it to the request
		s, err := c.client.getSnapshotter(ctx, r.Snapshotter)
		mounts, err := s.Mounts(ctx, r.SnapshotKey)
		spec, err := c.Spec(ctx)
		for _, m := range mounts {
			if spec.Linux != nil && spec.Linux.MountLabel != "" {
				context := label.FormatMountLabel("", spec.Linux.MountLabel)
				if context != "" {
					m.Options = append(m.Options, context)
				}
			}
			request.Rootfs = append(request.Rootfs, &types.Mount{
				Type:    m.Type,
				Source:  m.Source,
				Target:  m.Target,
				Options: m.Options,
			})
		}
	}
  ...
  
  response, err := c.client.TaskService().Create(ctx, request)
  t.pid = response.Pid
}

这块代码阅读起来比较绕,主要 containerd 用了类似于 Spring Bean 管理的 plugin 初始化机制,或者说类似 K8s 的 scheme 注册机制,由此类的 init func,调用初始化方法,并声明依赖关系,完成自身的初始化。

c.client.TaskService() 就是在这里完成的初始化。其 Create 方法核心如下:

func (l *local) Create(ctx context.Context, r *api.CreateTaskRequest, _ ...grpc.CallOption) (*api.CreateTaskResponse, error) {
  
	...
  
	rtime, err := l.getRuntime(container.Runtime.Name)

	_, err = rtime.Get(ctx, r.ContainerID)

	c, err := rtime.Create(ctx, r.ContainerID, opts)

	...
  
	return &api.CreateTaskResponse{
		ContainerID: r.ContainerID,
		Pid:         pid,
	}, nil
}

而这个 runtime,指的就是 shim。到这里我们知道 Containerd 对容器的生命周期管理是交由 task 来进行管理的,而 task 背后实际上调用的则是 shim,那为何 task 不直接调用 OCI runtime 呢?

二、简单说说 shim

只能来一个众所周知,OCI runtime 和 CRI 中间隔着一层 shim。先抛开为什么要这么设计,首先我们需要有一个常驻进程来监听容器的生命周期,同时负责协助容器这个进程来与外界进行沟通。

那么这个进程能不能是容器本身?我的观点是当然可以,但是不合适。如果我们将它作为容器 1 号进程,那么将会顾此失彼,可以想象一下我们在容器中运行的应用都需要考虑有这么个一号进程的存在,对容器来说是一个极大的侵入设计。是其他的非 1 号进程也同样的不合理,因为它的生命周期可以说完全不受管控,如果此进程在容器内由于什么原因挂了还要有一个后台进程负责把它起起来。

这个进程能不能是 containerd 或者 docker 自己?当然也可以,但是雷 docker 已经蹚了,早期的 docker 就是用 dockerd 去管理的,但是会有一个问题,这个进程在帮助容器与外界沟通,比如维持 tty 时,当前进程如果挂了,那么所有的连接就全断了。

所以 containerd 使用了一个进程专门用于管理容器,这个进程就是 shim。

2.1 Containerd 与 shim 的交互

整体的流程比较冗长,我这边整理了一下,大概如下,有兴趣的小伙伴可以去翻一翻。主要是如果不是为了前面说的那套 shim 管理进程机制,这里实际上没这么复杂,就是一个组装参数,调用 runc 的过程:

ShimManager 通过 bin 调用,拉起一个 shim 进程,并由 shim 进程返回一个地址,后续 ShimManager 与 Shim 的交互就通过 RPC 调用来完成,启动阶段会有三次比较重要的调用,Create、Wait(没画进图里) 和 Start,而这些调用,最基本上都是对 shim 的上下文设置,或者对 runc 的调用。

2.2 Shim 中的 Rootfs 处理

我们前面已经知道(3.5 章节),snapshotter 负责组装 Shim Request 中的 rootfs mounts,而 shim 中负责解析这部分 mounts 的代码在这里,它会根据 Request 中的 mounts 来进行 rootfs 的准备:

// NewContainer returns a new runc container
func NewContainer(ctx context.Context, platform stdio.Platform, r *task.CreateTaskRequest) (_ *Container, retErr error) {
  
  ...
  
	for _, m := range r.Rootfs {
		pmounts = append(pmounts, process.Mount{
			Type:    m.Type,
			Source:  m.Source,
			Target:  m.Target,
			Options: m.Options,
		})
	}
  
  rootfs := ""
	if len(pmounts) > 0 {
		rootfs = filepath.Join(r.Bundle, "rootfs")
		if err := os.Mkdir(rootfs, 0711); err != nil && !os.IsExist(err) {
			return nil, err
		}
	}
  
  var mounts []mount.Mount
	for _, pm := range pmounts {
		mounts = append(mounts, mount.Mount{
			Type:    pm.Type,
			Source:  pm.Source,
			Target:  pm.Target,
			Options: pm.Options,
		})
	}
	if err := mount.All(mounts, rootfs); err != nil {
		return nil, fmt.Errorf("failed to mount rootfs component: %w", err)
	}
}

func (m *Mount) mountWithHelper(helperBinary, typePrefix, target string) error {
	// helperBinary: "mount.fuse3"
	// target: "/foo/merged"
	// m.Type: "fuse3.fuse-overlayfs"
	// command: "mount.fuse3 overlay /foo/merged -o lowerdir=/foo/lower2:/foo/lower1,upperdir=/foo/upper,workdir=/foo/work -t fuse-overlayfs"
}

2.3 Shim 中的 Container 与 Process

shim 为每个容器也创建了一个 Container 对象来进行管理,而对容器内每个容器进程(比如 Exec 产生的,也包括主进程)使用 Process 对象来进行管理:

func NewContainer(ctx context.Context, platform stdio.Platform, r *task.CreateTaskRequest) (_ *Container, retErr error) {

	p, err := newInit(
		ctx,
		r.Bundle,
		filepath.Join(r.Bundle, "work"),
		ns,
		platform,
		config,
		opts,
		rootfs,
	)
	container := &Container{
		ID:              r.ID,
		Bundle:          r.Bundle,
		process:         p,
		processes:       make(map[string]process.Process),
		reservedProcess: make(map[string]struct{}),
	}
	return container, nil
}

其中 Process 有两个实现,一个是 init,一个是 process,init 是我们的 1 号进程,process 是其他进程。process 和 init 用的都是的具体操作都是使用状态机来进行实现的。

// Process on a system
type Process interface {
	// ID returns the id for the process
	ID() string
	// Pid returns the pid for the process
	Pid() int
	// ExitStatus returns the exit status
	ExitStatus() int
	// ExitedAt is the time the process exited
	ExitedAt() time.Time
	// Stdin returns the process STDIN
	Stdin() io.Closer
	// Stdio returns io information for the container
	Stdio() stdio.Stdio
	// Status returns the process status
	Status(context.Context) (string, error)
	// Wait blocks until the process has exited
	Wait()
	// Resize resizes the process console
	Resize(ws console.WinSize) error
	// Start execution of the process
	Start(context.Context) error
	// Delete deletes the process and its resourcess
	Delete(context.Context) error
	// Kill kills the process
	Kill(context.Context, uint32, bool) error
	// SetExited sets the exit status for the process
	SetExited(status int)
}

默认创建的 init Process 是 createdState,调用 start 时代码如下,调用结束后,转化为 running 状态:

func (s *createdState) Start(ctx context.Context) error {
	if err := s.p.start(ctx); err != nil {
		return err
	}
	return s.transition("running")
}

s.p.start 本质上是调用的 runc 的 bin:

// Start will start an already created container
func (r *Runc) Start(context context.Context, id string) error {
	return r.runOrError(r.command(context, "start", id))
}

三、其他

分析到这里,我们可以提出一个简单且不 hack 的 rootfs 定制方案,那就是 snapshotter,并且 containerd 同样支持自定义 snapshotter

Snapshot Plugins

In addition to the built-in Snapshot plugins in containerd, additional external plugins can be configured using GRPC. An external plugin is made available using the configured name and appears as a plugin alongside the built-in ones.

To add an external snapshot plugin, add the plugin to containerd's config file (by default at /etc/containerd/config.toml). The string following proxy_plugin. will be used as the name of the snapshotter and the address should refer to a socket with a GRPC listener serving containerd's Snapshot GRPC API. Remember to restart containerd for any configuration changes to take effect.

[proxy_plugins]
  [proxy_plugins.customsnapshot]
    type = "snapshot"
    address =  "/var/run/mysnapshotter.sock"

See PLUGINS.md for how to create plugins

下一期,我们将分析一下 containerd 默认的 snapshotter 是怎么管理容器根文件系统,以及如何自定义一个简单的 snapshotter。

文章如有错误,感谢指正。

参考资料

小手册:https://man7.org/linux/man-pages/

containerd 源码:https://github.com/containerd/containerd

runc、OCI:https://github.com/opencontainers

大佬博客 Linux: Mount Shared Subtrees:https://pages.dogdog.run/tech/mount_subtree.html