知识准备

1、kubernetes中的基本概念如deployment,statefulset,rc,svc,pod等;

2、tensorflow分布式集群。

Tensorflow架构简介

使用Tensorflow进行训练分为单机模式和分布式集群模式

单机模式的比较简单(略)

分布式模式主要包括如下几个角色:

     ps服务器:进行参数处理

     worker服务器:进行梯度计算(GPU Server)

     tensorboard服务器:运行tensorboard展示计算图

     client服务器:运行训练程序

分布式模式下,ps和worker服务器会暴露grpc端口,client服务器通过grpc远程执行训练

Demo代码如下:

ps和worker启动

...
...
def main(unused_args):
  # Create Protobuf ServerDef
  server_def = tf.train.ServerDef(protocol="grpc")

  # Cluster info
  server_def.cluster = tf.train.ClusterSpec({
    "worker": [
        "worker-1:2222",
        "worker-2:2222",
        "worker-3:2222",
        "worker-4:2222"
    ],
    "ps":[
        "ps-1:2222",
        "ps-2:2222"
    ]})

  # Job name
  if not FLAGS.job_name:
    raise ValueError("Empty job_name")
  server_def.job_name = FLAGS.job_name

  # Task index
  if FLAGS.task_id < 0:
    raise ValueError("Invalid task_id: %d" % FLAGS.task_id)
  server_def.task_index = FLAGS.task_id

  # Create GRPC Server instance
  server = tf.train.Server(server_def)

  # join() is blocking, unlike start()
  server.join()


if __name__ == "__main__":
  tf.app.run()

Kubernetes上部署分布式Tensorflow

实现思路

1、ps/worker服务分别对应一个statefulset,ps/worker的个数则是statefulset的副本数,比如要创建3个ps服务器,就创建1个包括3个副本的的statefulset,ps/worker中需要安装tensorflow并启动grpc服务;

==> 为什么使用statefulset,因为分布式集群需要记录集群的cluster_spec,也就是ps和worker节点的grpc_url(包括主机名、端口),通过deployment/rc创建的pod主机名称是随机的,pod重建后主机名会变化,因此没办法记录,而statefulset创建的pod是不会变化的,即使Pod被重新调度创建,比如创建一个副本数为3,名称为ps的statefulset,则pod的名称固定为ps-1,ps-2,ps-3,这样根据名称和个数就知道了grpc url

2、client服务对应一个job,并创建一个svc用于暴露服务通过外部访问(主要是访问jupyter和sshd),client中需要安装tensorflow,jupyter,sshd

==> 为什么使用Job,因为client服务中主要是执行训练脚本,使用job资源,则训练脚本执行完成后Pod会自动删除

3、tensorboard服务对应一个replicaset,并创建一个svc用于包括服务通过外部访问tensorboard,tensorboard中需要安装tfboard并启动服务;

4、创建一个pvc,各个服务的Pod使用pvc共享数据

举例说明:

在Kubernetes中创建一个名为tf-dis-task的tensorflow集群,该集群包括2个ps server和3个worker server

以下资源是创建好之后该集群对应的所有资源

[root@k8s-node1 example]# kubectl --namespace=admin get all --selector=taskname=tf-dis-task
NAME                             READY   STATUS    RESTARTS   AGE
pod/tf-dis-task-ps-0             1/1     Running   0          12m
pod/tf-dis-task-ps-1             1/1     Running   0          12m
pod/tf-dis-task-session-v7j7f    1/1     Running   0          16m
pod/tf-dis-task-tf-board-tbq69   2/2     Running   0          16m
pod/tf-dis-task-worker-0         1/1     Running   0          17m
pod/tf-dis-task-worker-1         1/1     Running   0          17m
pod/tf-dis-task-worker-2         1/1     Running   0          17m

NAME                              TYPE        CLUSTER-IP     EXTERNAL-IP   PORT(S)          AGE
service/tf-dis-task-session       NodePort    10.10.11.19    <none>        8888:32500/TCP   16m
service/tf-dis-task-session-ssh   NodePort    10.10.240.74   <none>        22:32051/TCP     16m
service/tf-dis-task-tf-board      NodePort    10.10.185.71   <none>        2080:32200/TCP   16m

NAME                                   DESIRED   CURRENT   READY   AGE
replicaset.apps/tf-dis-task-tf-board   1         1         1       16m

NAME                                  READY   AGE
statefulset.apps/tf-dis-task-ps       2/2     12m
statefulset.apps/tf-dis-task-worker   3/3     17m

NAME                            COMPLETIONS   DURATION   AGE
job.batch/tf-dis-task-session   0/1           16m        16m

详细说明

1. 分别创建一个ps和worker的statefulset,对应名称是tf-dis-task-ps和tf-dis-task-worker,对应的pod名称分别为tf-dis-task-ps-0,tf-dis-task-ps-1,tf-dis-task-worker-0,tf-dis-task-worker-1,tf-dis-task-worker-2

这样集群的cluster_spec为:

"worker": [
    "tf-dis-task-worker-0.tf-dis-task-worker:2222",
    "tf-dis-task-worker-1.tf-dis-task-worker:2222",
    "tf-dis-task-worker-2.tf-dis-task-worker:2222"
],
"ps":[
    "tf-dis-task-ps-0.tf-dis-task-ps:2222",
    "tf-dis-task-ps-1.tf-dis-task-ps:2222"
]

在statefuleset中设置环境变量

ps|tf-dis-task-ps-0.tf-dis-task-ps:2222;tf-dis-task-ps-1.tf-dis-task-ps:2222,worker|tf-dis-task-worker-0.tf-dis-task-worker:2222;tf-dis-task-worker-1.tf-dis-task-worker:2222;tf-dis-task-worker-2.tf-dis-task-worker:2222

2. 创建一个client的job,名称为tf-dis-task-session,同时对应创建2个service,分别是tf-dis-task-session,tf-dis-task-session-ssh,分别暴露jupyter的端口(8888)和sshd的端口(22),这个就可以通过宿主机访问容器的jupyter和ssh了

3. 创建一个tensorboard的replicaset,名称为tf-dis-task-tf-board,同时对应创建一个service,名称为tf-dis-task-tf-board,暴露tensorboard的端口

4. ps和worker的statefulset设置环境变量,分别如下:

# PS的环境变量
[root@k8s-node1 example]# kubectl --namespace=admin get statefulsets.apps tf-dis-task-ps -oyaml
...
...
- env:
  - name: TASK_NAME
    value: tf-dis-task
  - name: CLUSTER_SPEC
    value: ps|tf-dis-task-ps-0.tf-dis-task-ps:2222;tf-dis-task-ps-1.tf-dis-task-ps:2222,worker|tf-dis-task-worker-0.tf-dis-task-worker:2222;tf-dis-task-worker-1.tf-dis-task-worker:2222;tf-dis-task-worker-2.tf-dis-task-worker:2222
  - name: RESOURCE_TYPE
    value: compute
  - name: RESOURCE_NAME
    value: ps

# Worker的环境变量
[root@k8s-node1 example]# kubectl --namespace=admin get statefulsets.apps tf-dis-task-worker -oyaml
...
...
- env:
  - name: TASK_NAME
    value: tf-dis-task
  - name: CLUSTER_SPEC
    value: ps|tf-dis-task-ps-0.tf-dis-task-ps:2222;tf-dis-task-ps-1.tf-dis-task-ps:2222,worker|tf-dis-task-worker-0.tf-dis-task-worker:2222;tf-dis-task-worker-1.tf-dis-task-worker:2222;tf-dis-task-worker-2.tf-dis-task-worker:2222
  - name: RESOURCE_TYPE
    value: compute
  - name: RESOURCE_NAME
    value: worker

5. ps和worker的容器启动方式为启动grpc server,startup脚本如下(部分内容)

...
...
function run_tensorflow_cluster() {
    # 根据hostname获取index
    TASK_INDEX=$(hostname | awk -F'-' '{print $NF}')
    SCRIPT_DIR=$( cd ${0%/*} && pwd -P )
    # grpc_tensorflow_server.py就是启动grpc server,内容略
    exec python ${SCRIPT_DIR}/grpc_tensorflow_server.py --cluster_spec=$CLUSTER_SPEC --job_name=$RESOURCE_NAME --task_id=$TASK_INDEX
}

...

case $RESOURCE_TYPE in
    "compute"     ) run_tensorflow_cluster ;;
    "tensorboard" ) run_tensorboard ;;
    "session"     ) run_tensorflow_session ;;
esac

6. client和tensorboard的容器则是分别启动jupyter和tensorboard,内容略

7. 通过以上方式可以ps/worker/client/tfboard可以使用一个镜像,通过RESOUCE_TYPE来区分启动方式

资源调度

kubernetes中的node节点包括如下几个场景

1、Node节点只有CPU;

2、Node节点既有CPU也有GPU;

3、部分Node节点运行其他服务,不想运行tensorflow

4、GPU Node节点的gpu型号不一致

5、部分Node节点只想给某个用户或者某个集群单独使用,不希望其他集群的服务调度

调度的整体方案:

1. 设置nodetype=tensorflow,tensorflow集群只会调度在nodetype=tensorflow的节点

2. 设置cputype,gputype,ps/client/tfboard调度的时候选择cputype,worker调度的时候则根据具体的选择,可以选择cpu或者gpu,gpu可以选择具体型号的gpu

3. 设置pooltype(shared/unshared),节点是否是共享,如果是共享则所有集群的服务都可以调度到节点

4. 设置privatename,该标签只在unshared的情况下使用,如果是独占,则设置privatename=username或者taskname

[root@k8s-node1 example]# kubectl get node --label-columns=nodetype,cputype,gputype,pooltype
NAME        STATUS   ROLES         AGE   VERSION   NODETYPE     CPUTYPE                 GPUTYPE                      POOLTYPE
k8s-node1   Ready    master,node   9d    v1.13.4   tensorflow   intel-xeon-e5-2620-v4   nvidia-geforce-gtx-1080-ti   shared
k8s-node2   Ready    node          9d    v1.13.4   tensorflow   intel-xeon-e5-2620-v4   nvidia-geforce-gtx-1080-ti   shared

存在的问题

1. tensorflow的cpu和gpu镜像不同,但是只有worker需要gpu,因此无法使用同一个镜像,如果是gpu训练的话,则只有worker使用gpu镜像,其他服务还是使用cpu镜像

2. 分布式模式下,各个gprc server启动时会根据cluster_spec信息尝试连接其他的server,这个是有一个超时时间的,对于k8s中的statefulset资源在创建Pod的时候是按顺序启动的,也就是只有tf-dis-task-ps-0创建成功后再去创建tf-dis-task-ps-1/2/...,这个就会有个问题就是在ps/worker比较多的情况下(我们一个项目中有个集群有12个ps,20个worker),task-ps-0最开始创建后开始尝试连接其他server,但是需要很长时间task-ps-11才会创建,在task-ps-11创建成功之前,task-ps-0已经连接超时导致启动失败,该项目中解决方式是整体修改集群创建的方案,每个ps/worker都创建一个rc及一个service,也就是2个ps和3个worker的集群会创建5个副本数为1的rc,名称分别为task-ps-0,task-ps-1,task-worker-0,task-worker-1,task-worker-2,及5个对应同名称的svc,传入到容器中的CLUSTER_SPEC环境变量也直接是svc的名称,这样所有的ps/worker都可以并发创建,并且pod的重新调度也不会导致grpc url的改变

源码参考:https://github.com/Aaron-DH/tensorflow_on_k8s

Logo

开源、云原生的融合云平台

更多推荐