炼丹师的优雅内功一:TensorFlow on kubernetes
目前的部署方式,我们通过service 的cluster ip 来做副本的服务发现,还需要对多个job 分别编写配置文件,有许多的重复工作,如果是ps 架构,配置想可能更麻烦,所以我们需要一个在k8s集群上统一的作业发布平台,来代替我们配置一些重复工作。
前面的文章
TensorFlow 基础
是什么
tensorflow 是一个采用数据流图(data flow graph)技术来进行数值计算的开源库
tensor 表示传递的数据为张量(多维数组),flow 代表使用计算图进行运算。数据流图是一个有向图,使用节点
(nodes)和线(edges)来描述数学计算。节点一般表示数学操作(operation),也可以表示数据输入起点和输出终点,边
表示节点之间的输入/输出关系。这些数据边可以传送维度可动态调整的多维数据数组,即张量(Tensor)
。
基础知识
张量
张量是对矢量和矩阵向潜在的更高维度的泛化,Tensorflow在内部将张量表示为基本数据类型的n维数组。可以简单的将它理解为一个多维数组:
3 # 这个是0阶张量,就是标量,shape = []
[1.,2.,3.] # 这个是1阶张量,就是向量(矢量),shape = [3]
[[1.,2.,3.],[4.,5.,6.]] # 这个是2阶张量,就是二维数组,shape = [2, 3]
[[[1.,2.,3.]],[[7.,8.,9.]]] # 这个是3阶张量,就是三维数组,shape = [2,1,3]
TensorFlow内部使用tf.Tensor类的实例来表示张量,每个tf.Tensor有两个属性:
- dtype Tensor: 存储的数据类型,可以为
tf.float32
,tf.int32
,tf.string
。 - shape Tensor: 存储的多维数组中每个维度的数组的元素的个数。
# 定义一个随机数(标量)
random_float = tf.random.uniform(shape=())
# 定义一个有2个元素的零向量
zero_vector = tf.zeros(shape=(2))
# 定义两个2×2的常量矩阵
A = tf.constant([[1., 2.], [3., 4.]])
B = tf.constant([[5., 6.], [7., 8.]])
张量最重要的属性是形状、类型、值 ,可以通过shape
,dtype
,numpy()
方法获取
# 查看矩阵A的形状、类型和值
print(A.shape) # 输出(2, 2),即矩阵的长和宽均为2
print(A.dtype) # 输出<dtype: 'float32'>
print(A.numpy()) # 输出[[1. 2.]
# [3. 4.]]
操作
TensorFlow 里有大量的 操作 (Operation),使得我们可以将已有的张量进行运算后得到新的张量。示例如下:
C = tf.add(A, B) # 计算矩阵A和B的和
D = tf.matmul(A, B) # 计算矩阵A和B的乘积
操作完成之后,C和D的值为
tf.Tensor(
[[ 6. 8.]
[10. 12.]], shape=(2, 2), dtype=float32)
tf.Tensor(
[[19. 22.]
[43. 50.]], shape=(2, 2), dtype=float32)
自动求导
在机器学习中,我们经常需要计算函数的导数。TensorFlow 提供了强大的 自动求导机制 来计算导数。在即时执行模式下,TensorFlow 引入了 tf.GradientTape()
这个 “求导记录器” 来实现自动求导。以下代码展示了如何使用 tf.GradientTape()
计算函数 在 时的导数:
import tensorflow as tf
x = tf.Variable(initial_value=3.)
with tf.GradientTape() as tape: # 在 tf.GradientTape() 的上下文内,所有计算步骤都会被记录以用于求导
y = tf.square(x)
y_grad = tape.gradient(y, x) # 计算y关于x的导数
print(y, y_grad)
输出
tf.Tensor(9.0, shape=(), dtype=float32)
tf.Tensor(6.0, shape=(), dtype=float32)
tf.Tensor(125.0, shape=(), dtype=float32)
tf.Tensor(
[[ 70.]
[100.]], shape=(2, 1), dtype=float32)
tf.Tensor(30.0, shape=(), dtype=float32)
在机器学习中,更加常见的是对多元函数求偏导数,以及对向量或矩阵的求导。这些对于 TensorFlow 也不在话下。以下代码展示了如何使用 tf.GradientTape()
计算函数 在 时分别对 的偏导数。其中 。
tf.Tensor(125.0, shape=(), dtype=float32)
tf.Tensor(
[[ 70.]
[100.]], shape=(2, 1), dtype=float32)
tf.Tensor(30.0, shape=(), dtype=float32)
这里, tf.square()
操作代表对输入张量的每一个元素求平方,不改变张量形状。 tf.reduce_sum()
操作代表对输入张量的所有元素求和,输出一个形状为空的纯量张量(可以通过 axis
参数来指定求和的维度,不指定则默认对所有元素求和)。TensorFlow 中有大量的张量操作 API,包括数学运算、张量形状操作(如 tf.reshape()
)、切片和连接(如 tf.concat()
)等多种类型,可以通过查阅 TensorFlow 的官方 API 文档来进一步了解。
Tensorflow 如何进行分布式训练
当计算资源较大时,难以避免地我们会想到分布式训练,让我们更合理的利用计算资源,从而大幅压缩训练时间,TenSorflow 在tf.distribute.Strategy
中为我们提供了若干分布式策略,让我们适应不同的使用场景
单机多卡 MirroredStrategy
tf.distribute.MirroredStrategy
是一种简单且高性能的,数据并行的同步式分布式策略,主要支持多个 GPU 在同一台主机上训练。使用这种策略时,我们只需实例化一个 MirroredStrategy
策略:
以下代码展示了使用 MirroredStrategy
策略,在 TensorFlow Datasets 中的部分图像数据集上使用 Keras 训练 MobileNetV2 的过程:
import tensorflow as tf
import tensorflow_datasets as tfds
num_epochs = 5
batch_size_per_replica = 64
learning_rate = 0.001
# 实例化MirroredStrategy 策略
strategy = tf.distribute.MirroredStrategy()
print('Number of devices: %d' % strategy.num_replicas_in_sync) # 输出设备数量
batch_size = batch_size_per_replica * strategy.num_replicas_in_sync
# 载入数据集并预处理
def resize(image, label):
image = tf.image.resize(image, [224, 224]) / 255.0
return image, label
# 使用 TensorFlow Datasets 载入猫狗分类数据集,详见“TensorFlow Datasets数据集载入”一章
dataset = tfds.load("cats_vs_dogs", split=tfds.Split.TRAIN, as_supervised=True)
dataset = dataset.map(resize).shuffle(1024).batch(batch_size)
# 构建模型代码
with strategy.scope():
model = tf.keras.applications.MobileNetV2(weights=None, classes=2)
model.compile(
optimizer=tf.keras.optimizers.Adam(learning_rate=learning_rate),
loss=tf.keras.losses.sparse_categorical_crossentropy,
metrics=[tf.keras.metrics.sparse_categorical_accuracy]
)
model.fit(dataset, epochs=num_epochs)
多机多卡 MultiWorkerMirroredStrategy
多机多卡和单机多卡类似,多了些在多台计算机通信相关的配置,需要设置环境变量TF_CONFIG
os.environ['TF_CONFIG'] = json.dumps({
'cluster': {
'worker': ["localhost:20000", "localhost:20001"]
},
'task': {'type': 'worker', 'index': 0}
})
cluster
说明了整个多机集群的结构和每台机器的网络地址(IP + 端口号)。对于每一台机器,cluster
的值都是相同的;task
说明了当前机器的角色。例如,{'type': 'worker', 'index': 0}
说明当前机器是cluster
中的第 0 个 worker(即localhost:20000
)。每一台机器的task
值都需要针对当前主机进行分别的设置。
示例代码
from __future__ import absolute_import, division, print_function
import argparse
import json
import os
import tensorflow_datasets as tfds
import tensorflow as tf
from tensorflow.keras import layers, models
def make_datasets_unbatched():
BUFFER_SIZE = 10000
# Scaling MNIST data from (0, 255] to (0., 1.]
def scale(image, label):
image = tf.cast(image, tf.float32)
image /= 255
return image, label
datasets, _ = tfds.load(name='mnist', with_info=True, as_supervised=True)
return datasets['train'].map(scale).cache().shuffle(BUFFER_SIZE)
def build_and_compile_cnn_model():
model = models.Sequential()
model.add(
layers.Conv2D(32, (3, 3), activation='relu', input_shape=(28, 28, 1)))
model.add(layers.MaxPooling2D((2, 2)))
model.add(layers.Conv2D(64, (3, 3), activation='relu'))
model.add(layers.MaxPooling2D((2, 2)))
model.add(layers.Conv2D(64, (3, 3), activation='relu'))
model.add(layers.Flatten())
model.add(layers.Dense(64, activation='relu'))
model.add(layers.Dense(10, activation='softmax'))
model.summary()
model.compile(optimizer='adam',
loss='sparse_categorical_crossentropy',
metrics=['accuracy'])
return model
def decay(epoch):
if epoch < 3: #pylint: disable=no-else-return
return 1e-3
if 3 <= epoch < 7:
return 1e-4
return 1e-5
def main(args):
# MultiWorkerMirroredStrategy creates copies of all variables in the model's
# layers on each device across all workers
# if your GPUs don't support NCCL, replace "communication" with another
strategy = tf.distribute.MultiWorkerMirroredStrategy(
communication_options=tf.distribute.experimental.CommunicationOptions(implementation=tf.distribute.experimental.CollectiveCommunication.AUTO))
BATCH_SIZE_PER_REPLICA = 64
BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync
with strategy.scope():
ds_train = make_datasets_unbatched().batch(BATCH_SIZE).repeat()
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = \
tf.data.experimental.AutoShardPolicy.DATA
ds_train = ds_train.with_options(options)
# Model building/compiling need to be within `strategy.scope()`.
multi_worker_model = build_and_compile_cnn_model()
# Define the checkpoint directory to store the checkpoints
checkpoint_dir = args.checkpoint_dir
# Name of the checkpoint files
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt_{epoch}")
# Function for decaying the learning rate.
# You can define any decay function you need.
# Callback for printing the LR at the end of each epoch.
class PrintLR(tf.keras.callbacks.Callback):
def on_epoch_end(self, epoch, logs=None): #pylint: disable=no-self-use
print('\nLearning rate for epoch {} is {}'.format(
epoch + 1, multi_worker_model.optimizer.lr.numpy()))
callbacks = [
tf.keras.callbacks.TensorBoard(log_dir='./logs'),
tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_prefix,
save_weights_only=True),
tf.keras.callbacks.LearningRateScheduler(decay),
PrintLR()
]
# Keras' `model.fit()` trains the model with specified number of epochs and
# number of steps per epoch. Note that the numbers here are for demonstration
# purposes only and may not sufficiently produce a model with good quality.
multi_worker_model.fit(ds_train,
epochs=10,
steps_per_epoch=70,
callbacks=callbacks)
# Saving a model
# Let `is_chief` be a utility function that inspects the cluster spec and
# current task type and returns True if the worker is the chief and False
# otherwise.
def is_chief():
return TASK_INDEX == 0
if is_chief():
model_path = args.saved_model_dir
else:
# Save to a path that is unique across workers.
model_path = args.saved_model_dir + '/worker_tmp_' + str(TASK_INDEX)
multi_worker_model.save(model_path)
if __name__ == '__main__':
os.environ['NCCL_DEBUG'] = 'INFO'
tfds.disable_progress_bar()
# to decide if a worker is chief, get TASK_INDEX in Cluster info
tf_config = json.loads(os.environ.get('TF_CONFIG') or '{}')
TASK_INDEX = tf_config['task']['index']
parser = argparse.ArgumentParser()
parser.add_argument('--saved_model_dir',
type=str,
required=True,
help='Tensorflow export directory.')
parser.add_argument('--checkpoint_dir',
type=str,
required=True,
help='Tensorflow checkpoint directory.')
parsed_args = parser.parse_args()
main(parsed_args)
这是一个典型的数据并行的分布式方式,分布式集群的配置信息由TF_CONFIG来传入,strategy 会自动读取环境变量
TF_CONFIG
{
"worker": [
"node1:3333",
"node2:3333",
"node3:3333"
],
"task": {
"index": 0,
"type": "work"
}
}
手动部署
假设我们有三台节点分别为node1,node2,node3 我们把我们在三个节点上部署work0 work1 work2 组成一个3 works 的集群
node1
$ TF_CONFIG='{"cluster":{"worker":["node1:3333","node2:3333","node3:3333"]},"task":{"index":0,"type":"worker"}}' python keras_with_strategy.py
node2
$ TF_CONFIG='{"cluster":{"worker":["node1:3333","node2:3333","node3:3333"]},"task":{"index":1,"type":"worker"}}' python keras_with_strategy.py
node3
$ TF_CONFIG='{"cluster":{"worker":["node1:3333","node2:3333","node3:3333"]},"task":{"index":2,"type":"worker"}}' python keras_with_strategy.py
我们可以看到除了index 不一样其他都一样,
如何将训练任务跑在K8S集群中
构建docker 镜像
FROM tensorflow/tensorflow:1.15.0
RUN pip install tensorflow_datasets -i https://pypi.douban.com/simple
ADD . /root
WORKDIR /root
ENTRYPOINT ["python", "/root/keras_with_strategy.py"]
编写yaml文件
我们这里选择job 作为拉起pod 的上层资源,job 可以说是k8s中完美适配机器学习的一个workload,简单介绍下job 的特点
Job 会创建一个或者多个 Pod,并将继续重试 Pod 的执行,直到指定数量的 Pod 成功终止。 随着 Pod 成功结束,Job 跟踪记录成功完成的 Pod 个数。 当数量达到指定的成功个数阈值时,任务(即 Job)结束。 删除 Job 的操作会清除所创建的全部 Pod。 挂起 Job 的操作会删除 Job 的所有活跃 Pod,直到 Job 被再次恢复执行。
一种简单的使用场景下,你会创建一个 Job 对象以便以一种可靠的方式运行某 Pod 直到完成。 当第一个 Pod 失败或者被删除(比如因为节点硬件失效或者重启)时,Job 对象会启动一个新的 Pod。
apiVersion: batch/v1
kind: Job
metadata:
name: worker-0
spec:
template:
spec:
containers:
- name: worker-0
image: registry.cn-hangzhou.aliyuncs.com/chenzhen-docker-repo/keras_with_stragety:v3
env:
- name: TF_CONFIG
value: '{"cluster":{"worker":["keras-with-strategy-worker-0:3333","keras-with-strategy-worker-1:3333","keras-with-strategy-worker-2:3333"]},"task":{"index":0,"type":"worker"}}'
ports:
- containerPort: 3333
restartPolicy: Never
backoffLimit: 4
---
kind: Service
apiVersion: v1
metadata:
name: keras-with-strategy-worker-0
spec:
type: ClusterIP
selector:
job-name: worker-0
ports:
- port: 3333
上面是work0的部署yaml ,其中job 和service 为了方便就全部部署在default namespace 上,我们其他的部署的化只需要把work-0 改为work-1 或work-2
总结,
目前的部署方式,我们通过service 的cluster ip 来做副本的服务发现,还需要对多个job 分别编写配置文件,有许多的重复工作,如果是ps 架构,配置想可能更麻烦,所以我们需要一个在k8s集群上统一的作业发布平台,来代替我们配置一些重复工作
后续
炼丹师的优雅内功三:kubeflow 如何承接机器学习任务
炼丹师的优雅内功二:PyTorch on kubernetes
炼丹师的优雅内功四:volcano 批量处理容器计算业务
炼丹师的优雅内功五:大规模计算集群的秘密RDMA网络,IB网络和RoCe的纠葛
更多推荐
所有评论(0)