之前的两篇blog介绍了kubelet启动pod的过程,但没有介绍存储挂载的过程,这里还是补充说一下,
首先看kubelet启动时候

// Start volume manager
go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)

看Run启动方法,如下:

func (vm *volumeManager) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) {
    defer runtime.HandleCrash()

    go vm.desiredStateOfWorldPopulator.Run(sourcesReady, stopCh)
    glog.V(2).Infof("The desired_state_of_world populator starts")

    glog.Infof("Starting Kubelet Volume Manager")
    go vm.reconciler.Run(stopCh)

    <-stopCh
    glog.Infof("Shutting down Kubelet Volume Manager")
}

这个里面最总要的是reconciler.Run,这个是个方法pkg/kubelet/volumemanager/reconciler/reconciler.go

func (rc *reconciler) Run(stopCh <-chan struct{}) {
    // Wait for the populator to indicate that it has actually populated the desired state of world, meaning it has
    // completed a populate loop that started after sources are all ready. After, there's no need to keep checking.
    wait.PollUntil(rc.loopSleepDuration, func() (bool, error) {
        rc.reconciliationLoopFunc(rc.populatorHasAddedPods())()
        return rc.populatorHasAddedPods(), nil
    }, stopCh)
    wait.Until(rc.reconciliationLoopFunc(true), rc.loopSleepDuration, stopCh)
}

通过定时任务定期同步,reconcile就是一致性函数,保存desired和actual状态一致。

func (rc *reconciler) reconcile() {

//先确保应该解挂先解挂(unmounted),这个放在第一是因为这个存储可能将要被本主机上面其它的容器所使用,这里先释放挂载,如果实际上是挂载的,但理想状态不是挂载,则执行解挂操作。
    for _, mountedVolume := range rc.actualStateOfWorld.GetMountedVolumes() {
        if !rc.desiredStateOfWorld.PodExistsInVolume(mountedVolume.PodName, mountedVolume.VolumeName) {
err := rc.operationExecutor.UnmountVolume(
mountedVolume.MountedVolume, rc.actualStateOfWorld)
...

//确保存储是attached和mounted的状态,如果没有挂载主机则执行AttachVolume挂载操作,如果没有mount怎执行MountVolume操作。
for _, volumeToMount := range rc.desiredStateOfWorld.GetVolumesToMount() {
        volMounted, devicePath, err := rc.actualStateOfWorld.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName)
err := rc.operationExecutor.AttachVolume(volumeToAttach, rc.actualStateOfWorld)
err := rc.operationExecutor.MountVolume(
                rc.waitForAttachTimeout,
                volumeToMount.VolumeToMount,
                rc.actualStateOfWorld,
                isRemount)

}

//最后确保应当解挂的存储解挂(detached 和unmounted),这个上面第一个的区别是,上面只是解挂unmounted,下面则是包括了存储detach和unmount两个步骤。
for _, attachedVolume := range rc.actualStateOfWorld.GetUnmountedVolumes() {    
if !rc.desiredStateOfWorld.VolumeExists(attachedVolume.VolumeName) &&

这样存储就可以加载到主机attach,并挂载到容器目录mount。
我觉得我这里还是严格区分一下这两个词,不然大家可能混淆,我称attach是加载,是把磁盘加载到主机,mount叫做挂载,包括格式化磁盘和mount到本地plugin目录下面。
那么上面的MountVolume、UnmountVolume这些operator是啥呢?是接口,这不是废话,kubernetes为了能兼容各种存储,每个存储都自己去实现这个方法,一个通用的flex pkg/volume/flexvolume/attacher.go。

func (a *flexVolumeAttacher) MountDevice(spec *volume.Spec, devicePath string, deviceMountPath string) error {
    // Mount only once.
    alreadyMounted, err := prepareForMount(a.plugin.host.GetMounter(), deviceMountPath)
    if err != nil {
        return err
    }
    if alreadyMounted {
        return nil
    }

    call := a.plugin.NewDriverCall(mountDeviceCmd)
    call.Append(deviceMountPath)
    call.Append(devicePath)
    call.AppendSpec(spec, a.plugin.host, nil)

    _, err = call.Run()
    if isCmdNotSupportedErr(err) {
        // Devicepath is empty if the plugin does not support attach calls. Ignore mountDevice calls if the
        // plugin does not implement attach interface.
        if devicePath != "" {
            return (*attacherDefaults)(a).MountDevice(spec, devicePath, deviceMountPath, a.plugin.host.GetMounter())
        } else {
            return nil
        }
    }
    return err
}

这个里面就可以通过自己定义挂载解挂的二进制文件可执行文件去具体挂载或者解挂了。在kubelet启动地方加入参数–volume-plugin-dir=/usr/libexec/kubernetes/kubelet-plugins/volume/exec/就可以引入第三方存储插件了。

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐