分布式增强学习Ray.tune调参神器HyperOptSearch

近日在研究分布式增强学习框架Ray.tune(深度学习参数优化)模块时,发现里面使用的HyperOptSearch其实是基于HyperOpt(最后章节讲解)(https://github.com/hyperopt/hyperopt)支持的SearchAlgorithm,用于执行基于模型的顺序超参数优化,但是又结合了一些新的算法进行实现。如HyperOpt与最先进的算法(如HyperBand等)结合使用,而无需修改任何模型训练代码。

TuneAPI概览

Tune计划在群集中进行多次试验。每个试验都运行一个用户定义的Python函数或类,并通过Tune的Variant Generator 的配置变体或用户指定的Search Algorithm进行参数化。试验由trial scheduler安排和管理。

底层是可选的参数搜索算法。

上层是Tune两种模型训练方式的API:

  1. 基于函数的api需要使用reporter监控模型状态,每一个批次返回一次指标情况,监控指标可以自定义;
  2. 继承Trainable类的API需要覆盖类方法,一般来说,只需实现_train、_save和_restore子类即可,类方法参数如下:
_setup(config )
    子类应该覆盖它以进行自定义初始化。
    参数:    config(dict) - 给出超参数和其他配置。self.config的副本。

_train():
    运行一次训练的逻辑迭代。此类自动填充结果中的以下字段:
        done (bool):训练终止。仅在未提供时填写。
        time_this_iter_s(float):此迭代运行的时间(以秒为单位)。这可以被覆盖以覆盖系统计算的时间差。
        time_total_s(float):整个实验的累计时间(秒)。
        experiment_id(str):此实验的唯一字符串标识符。在检查点/恢复调用中保留此ID。
        training_iteration(int):此训练迭代的索引,例如对train()的调用。
        pid(str):培训过程的pid。
        date(str):处理结果的格式化日期。
        timestamp(str):处理结果的UNIX时间戳。
        hostname(str):托管培训过程的计算机的主机名。
        node_ip(str):托管培训过程的机器的节点ip。
    返回:打印描述训练进度的字典。
    
_save(checkpoint_dir=None):
    将当前模型状态保存到检查点。仅当 "checkpoint_at_end": True 时才会调用该函数。
    参数:checkpoint_dir(str) - 用于放置检查点的可选目录。
    返回:    
        如果是string,则返回值为可以传递给restore()的检查点路径。
        如果是dict,返回值将由Tune自动序列化并传递给_restore()。
        返回类型:checkpoint(str | dict)
        Examples:
            >>> print(trainable1._save("/tmp/checkpoint_1"))
                "/tmp/checkpoint_1/my_checkpoint_file"
            >>> print(trainable2._save("/tmp/checkpoint_2"))
                {"some": "data"}

_restore(checkpoint_path):
    从给定的模型检查点恢复训练状态。
    从调用save()返回这些检查点。
    此方法恢复使用检查点保存的其他元数据。
    参数:    checkpoint(str | dict) - _save返回的值。如果是字符串,则它是检查点路径。

_stop()
    释放此可训练所使用的所有资源。

本次实验我们采用 function-based API :

def trainable(config, reporter):
    """
    Args:
        config (dict): Parameters provided from the search algorithm or variant generation.
        reporter (Reporter): Handle to report intermediate metrics to Tune.
    """
    while True:
        # ...
        reporter(**kwargs)
        # ray.tune.function_runner.StatusReporter:传递给函数的对象,可以通过该函数报告状态。
        # 如:reporter(mean_accuracy=0.99, training_iteration=50)

Tune将在Ray actor进程中的单独线程上运行此函数。注:此API是不可检查的,因为该线程永远不会将控制权返回给其调用者。

本人总结了使用Tune的步骤:

步骤1)

定义神经网络函数train_objective_tune(config, reporter)

对于希望调整的功能参数,我们需要将配置字典传入训练函数里。还要将reporter对象传递给下面的train_objective_tune类。

    def trainable(config, reporter):
        Args:
             config(dict):Parameters provided from the search algorithm or variant generation.
             reporter(Reporter): Handle to report intermediate metrics to Tune.
    
    我们希望在模型训练时跟踪其性能,跟踪的指标可以自定义(比如loss、accuracy)。
    比如:从交叉验证集获取模型精度。
        metric = cross_val_score(gbm,attr_train,label_train,cv=5,scoring="roc_auc").mean()
    比如:从Keras获得mean_accuracy,并向reporter报告每批次的mean_accuracy。
        您可以使用以下代码从Keras获得模型精度:
        loss,mean_accuracy = model.evaluate(x_batch,y_batch)
    比如:loss随迭代的次数而减少
        可实现类似keras的提前停止条件,我们可以通过判断loss在N次没有下降之后返回一个True值到停止参数"stop"里
        callbacks=[EarlyStopping(patience=6), checkpointer] 
        checkpointer = ModelCheckpoint(filepath='/home/byz/ray_results/save_model/_callbacks_class.h5',
                                       verbose=1,
                                       monitor='val_loss',
                                       mode='min',  # auto
                                       save_best_only=True)
    比如:迭代的次数
        for i in range(100):
            reporter(
                timesteps_total = i,
                neg_mean_loss = -(config["height"] - 14)**2 +abs(config["width"] - 3))
            time.sleep(0.02)
步骤2)
    注册上面包装的算法函数
    tune.register_trainable("train_objective_tune", train_objective_tune)
    
步骤3)
    编写实例参数(包括算法参数)-搜索空间 如:
    train_spec = {
        "run": My_Model,
        "trial_resources": {
            "cpu": 20,
            "gpu": 2 # 异步搜索算法下,有几个gpu就可以同时训练几个模型,2gpu就同时训练2个不同参数的模型,剩下参数的模型则等待gpu空闲
        },
        "stop": { # 停止条件(可以指定里面的任何指标)
            "mean_accuracy": 0.2,
            "training_iteration": 10, # 迭代次数
            "stop_loss_num": 3, #loss有5次不下降我们就结束
        },
        "config": {
            "checkpoint_dir": "/home/byz/pro/Ray/checkpoint_dir",
            "epochs": 1,
            "batch_size": 64,
            "lr": grid_search([10**-4, 10**-5]), # 使用网格参数搜索
            # 默认情况下,Tune还支持用户指定的lambda函数的采样参数,这些参数可以单独使用,也可以与网格搜索结合使用。 例如: lambda spec: np.random.uniform(0.1, 100),
            "decay": lambda spec: spec.config.lr / 100.0, #learning rate decay:在训练神经网络的时候,通常在训练刚开始的时候使用较大的learning rate, 随着训练的进行,我们会慢慢的减小learning rate。
            "dropout": grid_search([0.25, 0.5]),
        },
        "num_samples": 4, #这指定了您计划运行的试验次数,从超参数空间中采样的次数。,而不是批次的大小。
    }
    
    可选参数:
        name(str) - 实验名称。
        run(function | class | str) - 要训练的算法或模型。这可以指内置算法的名称(例如RLLib的DQN或PPO),用户定义的可训练函数或类,或者在tune寄存器中注册的可训练函数或类的字符串标识符。
        stop(dict) - 停止标准。键可以是'train()'的返回结果中的任何字段,无论哪个首先到达。默认为清空字典。
        config(dict) - Tune变体生成的特定于算法的配置(例如env,hyperparams)。默认为清空字典。自定义搜索算法可能会忽略它。
        trial_resources(dict) - 每次试验分配的机器资源,例如。请注意,除非您在此处指定GPU,否则不会分配GPU。默认为1个CPU和0个GPU 。{"cpu": 64, "gpu": 8}Trainable.default_resource_request()
        repeat(int) - 已弃用,将在Ray的未来版本中删除。请改用num_samples。
        num_samples(int) - 从超参数空间中采样的次数。默认为1.如果grid_search作为参数提供,则网格将重复 num_samples次。
        local_dir(str) - 将训练结果保存到的本地目录。默认为~/ray_results。
        upload_dir(str) - 用于将训练结果同步到(例如s3://bucket)的可选URI 。
        checkpoint_freq(int) - 检查点之间的训练迭代次数。值0(默认值)禁用检查点。
        checkpoint_at_end(bool) - 无论checkpoint_freq如何,是否在实验结束时保存模型检查点。默认值为False。
        max_failures(int) - 尝试从最后一个检查点恢复试验至少这么多次。仅在启用了检查点时适用。默认为3。
        restore(str) - 检查点的路径。只有在运行1次试验时才有意义。默认为无。
    
步骤4)
    选择调度算法
    scheduler(TrialScheduler) - 用于执行实验的调度程序。在FIFO(默认)、MedianStopping、AsyncHyperBand和HyperBand中进行选择。
    
    如:
    scheduler = HyperBandScheduler(
        time_attr="training_iteration", # 时间单位,绑定max_t
        reward_attr="mean_accuracy",    # 报告的目标值
        max_t=400)                      # 指定每次试验的最大运行时间,单位s
    HyperBandScheduler使用HyperBand优化算法提前停止试验。它将试验划分为不同大小的括号,并定期提前停止每个括号内的低性能试验。
    
    如:
    scheduler = AsyncHyperBandScheduler(
        time_attr="training_iteration", # 时间单位,绑定max_t
        reward_attr="mean_accuracy",    # 报告的目标值
        max_t=400,                      # 指定每次试验的最大运行时间,单位s
        grace_period=20)                # 至少暂时停止试验。单位与time_attr命名的属性相同。(我认为是acc 20次不增加就暂停试验)
    提供与HyperBand类似的理论性能,但要避免HyperBand面临的拖延问题。
    一个实现细节是当使用多个括号时,对括号的试验分配是在softmax概率下随机完成的。

步骤5)
    选择搜索算法
        默认为BasicVariantGenerator,在ray.tune.suggest下,有FIFOScheduler等。
        如:
        algo = HyperOptSearch(space, max_concurrent=4, reward_attr="mean_accuracy")
    
步骤6)
    调用搜索和调度算法,运行各个参数实例
        tune.run_experiments(train_spec, search_alg=algo, scheduler=scheduler)
    
步骤6)
    从搜索过程中获取最佳模型
        best_model = Tune.get_best_model(My_Model._build_model(), trials, metric="mean_accuracy")
        print("Best model...")
    
步骤6)
    试用模型
        HTML(open("input_final.html").read()) ##将您的输入加载到变量final_data中。
        manual_input = prepare_data(final_data)
        best = Tune.best_model.predict(manual_input).argmax()
        first = first_model.predict(manual_input).argmax()
        print("Best model got {}, first model got {}".format(best, first))
        plt.imshow(manual_input.reshape((28,28)))

使用HyperOptSearch对XGboost参数调整

导入依赖:

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import ray
from ray import tune
from ray.tune.schedulers import AsyncHyperBandScheduler
from ray.tune.suggest import HyperOptSearch

import argparse, time, numpy as np, pandas as pd
from hyperopt import hp

import xgboost
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import cross_val_score,train_test_split #默认情况下,cross_val_score不会随机化数据。旧版sklearn.cross_validation包已废弃

读取数据集:

fileName = "/home/raini/datasets/prostate.csv"
def loadDatasets(fileName):
    data = pd.read_csv(fileName, header=0, index_col=0)
    return data
data = loadDatasets(fileName)
label = data['CAPSULE'].values
labels = label.reshape((1,-1))
attrs = data.drop(columns='CAPSULE')

minmaxscaler = MinMaxScaler()
attrs = minmaxscaler.fit_transform(attrs)

index = [i for i in range(0,len(label))]
np.random.shuffle(index)
trainIndex = index[:int(len(label)*0.7)]
print(len(trainIndex))
testIndex = index[int(len(label)*0.7):]
print(len(testIndex))
attr_train = attrs[trainIndex,:]
print(attr_train.shape)
attr_test = attrs[testIndex,:]
print(attr_test.shape)

label_train = labels[:,trainIndex].tolist()[0]
print(len(label_train))

label_test = labels[:,testIndex].tolist()[0]
print(len(label_test))
print(np.mat(label_train).reshape((-1,1)).shape)

数据样例:

算法函数:

def train_objective_tune(config, reporter):
    '''
    Tune将通过在计算机上安排多个试验来自动化和分发您的超参数搜索。每个试验都运行一个用户定义的Python函数,其中包含一组采样的超参数。
    :param config(dict):Parameters provided from the search algorithm or variant generation.
    :param reporter(Reporter): Handle to report intermediate metrics to Tune.
    :return: 
    '''
    max_depth = config["max_depth"] + 5
    n_estimators = config['n_estimators'] * 5 + 50
    learning_rate = config["learning_rate"] * 0.02 + 0.05
    subsample = config["subsample"] * 0.1 + 0.7
    min_child_weight = config["min_child_weight"] + 1
    print("max_depth:" + str(max_depth))
    print("n_estimator:" + str(n_estimators))
    print("learning_rate:" + str(learning_rate))
    print("subsample:" + str(subsample))
    print("min_child_weight:" + str(min_child_weight))
    global attr_train, label_train

    xgb = xgboost.XGBClassifier(nthread=4,  # 进程数
                            max_depth=max_depth,  # 最大深度
                            n_estimators=n_estimators,  # 树的数量
                            learning_rate=learning_rate,  # 学习率
                            subsample=subsample,  # 采样数
                            min_child_weight=min_child_weight,  # 孩子数
                            max_delta_step=10,  # 10步不降则停止
                            objective="binary:logistic")

    metric = cross_val_score(xgb, attr_train, label_train, cv=5, scoring="roc_auc").mean()
    print(metric)
    for i in range(100):
        ## reward_attr_mean_accuracy="neg_mean_loss" 是因为我们需要它想准确度一样越来越高
        # timesteps_total="training_iteration"
        reporter(timesteps_total=i, mean_accuracy=-metric) #reporter(mean_loss=test_loss, mean_accuracy=accuracy, metric2=1, metric3=0.3, ...)
        time.sleep(0.02)
    #return -metric

核心代码:


if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument("--smoke-test", action="store_true", help="Finish quickly for testing")
    #parser.add_argument("--checkpoint_dir", type=str, default="./checkpoint_dir", help="Size of first kernel")
    args, _ = parser.parse_known_args()

    ray.init(num_gpus=0,num_cpus=2,redirect_output=True) #,num_workers=2 Exception: The 'num_workers' argument is deprecated. Please use 'num_cpus' instead.

    tune.register_trainable("train_objective_tune", train_objective_tune)

    space = {"max_depth": hp.randint("max_depth", 15),
             "n_estimators": hp.randint("n_estimators", 10),  # [0,1,2,3,4,5] -> [50,]
             "learning_rate": hp.randint("learning_rate", 6),  # [0,1,2,3,4,5] -> 0.05,0.06
             "subsample": hp.randint("subsample", 4),  # [0,1,2,3] -> [0.7,0.8,0.9,1.0]
             "min_child_weight": hp.randint("min_child_weight", 5),  #
             }

    train_spec = {
        "byz_ahbs_xgb_exp": {
            "run": "train_objective_tune",
        "trial_resources": {
            "cpu": 4,
            "gpu": 0,
        },
            "num_samples": 10 if args.smoke_test else 1000, # 这指定了您计划运行的试验次数,从超参数空间中采样的次数,而不是批次的大小。
            "stop": {  # 停止条件(可以指定里面的任何指标)
                "mean_accuracy": 0.18,
                "timesteps_total": 100,
                #"training_iteration": 2,  # 迭代次数
            },
        }
    }

    if args.smoke_test:
        train_spec["config"]["lr"] = 10**-4
        train_spec["config"]["dropout"] = 0.5
    #
    algo = HyperOptSearch(space, max_concurrent=4, reward_attr="mean_accuracy")
    scheduler = AsyncHyperBandScheduler(
        time_attr="training_iteration",
        reward_attr="mean_accuracy",
        max_t=400,
        grace_period=20)
    #tune.run_experiments({"pbt_cifar10": train_spec}, scheduler=scheduler)

    tune.run_experiments(train_spec, search_alg=algo, scheduler=scheduler)

运行状态:

最后得到的每对参数的结果:

 

 

普通的Py调参神器Hyperopt

下面是原生非分布式的Hyperopt(对比、参考、学习)

一、安装

pip install hyperopt

二、说明

Hyperopt提供了一个优化接口,这个接口接受一个评估函数和参数空间,能计算出参数空间内的一个点的损失函数值。用户还要指定空间内参数的分布情况。 
Hyheropt四个重要的因素:指定需要最小化的函数,搜索的空间,采样的数据集(trails database)(可选),搜索的算法(可选)。 
首先,定义一个目标函数,接受一个变量,计算后返回一个函数的损失值,比如要最小化函数q(x,y) = x**2 + y**2

指定搜索的算法,算法也就是hyperopt的fmin函数的algo参数的取值。当前支持的算法由随机搜索(对应是hyperopt.rand.suggest),模拟退火(对应是hyperopt.anneal.suggest),TPE算法。

关于参数空间的设置,比如优化函数q,输入fmin(q,space=hp.uniform(‘a’,0,1)).hp.uniform函数的第一个参数是标签,每个超参数在参数空间内必须具有独一无二的标签。hp.uniform指定了参数的分布。其他的参数分布比如 


hp.choice返回一个选项,选项可以是list或者tuple.options可以是嵌套的表达式,用于组成条件参数。 
hp.pchoice(label,p_options)以一定的概率返回一个p_options的一个选项。这个选项使得函数在搜索过程中对每个选项的可能性不均匀。 
hp.uniform(label,low,high)参数在low和high之间均匀分布。 
hp.quniform(label,low,high,q),参数的取值是round(uniform(low,high)/q)*q,适用于那些离散的取值。 
hp.loguniform(label,low,high)绘制exp(uniform(low,high)),变量的取值范围是[exp(low),exp(high)] 
hp.randint(label,upper) 返回一个在[0,upper)前闭后开的区间内的随机整数。 

搜索空间可以含有list和dictionary.

from hyperopt import hp
list_space = [
hp.uniform(’a’, 0, 1),
hp.loguniform(’b’, 0, 1)]
tuple_space = (
hp.uniform(’a’, 0, 1),
hp.loguniform(’b’, 0, 1))
dict_space = {
’a’: hp.uniform(’a’, 0, 1),
’b’: hp.loguniform(’b’, 0, 1)}

 

三、简单例子

from hyperopt import  hp,fmin, rand, tpe, space_eval

def q (args) :
    x, y = args
    return x**2-2*x+1 + y**2

space = [hp.randint('x', 5), hp.randint('y', 5)]

best = fmin(q,space,algo=rand.suggest,max_evals=10)

print(best)

输出:

{'x': 2, 'y': 0}

四、xgboost举例

xgboost具有很多的参数,把xgboost的代码写成一个函数,然后传入fmin中进行参数优化,将交叉验证的auc作为优化目标。auc越大越好,由于fmin是求最小值,因此求-auc的最小值。所用的数据集是202列的数据集,第一列样本id,最后一列是label,中间200列是属性。

#coding:utf-8
import numpy as np
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
import xgboost as xgb
from random import shuffle
from xgboost.sklearn import XGBClassifier
from sklearn.cross_validation import cross_val_score
import pickle
import time
from hyperopt import fmin, tpe, hp,space_eval,rand,Trials,partial,STATUS_OK

def loadFile(fileName = "E://zalei//browsetop200Pca.csv"):
    data = pd.read_csv(fileName,header=None)
    data = data.values
    return data

data = loadFile()
label = data[:,-1]
attrs = data[:,:-1]
labels = label.reshape((1,-1))
label = labels.tolist()[0]

minmaxscaler = MinMaxScaler()
attrs = minmaxscaler.fit_transform(attrs)

index = range(0,len(label))
shuffle(index)
trainIndex = index[:int(len(label)*0.7)]
print len(trainIndex)
testIndex = index[int(len(label)*0.7):]
print len(testIndex)
attr_train = attrs[trainIndex,:]
print attr_train.shape
attr_test = attrs[testIndex,:]
print attr_test.shape
label_train = labels[:,trainIndex].tolist()[0]
print len(label_train)
label_test = labels[:,testIndex].tolist()[0]
print len(label_test)
print np.mat(label_train).reshape((-1,1)).shape


def GBM(argsDict):
    max_depth = argsDict["max_depth"] + 5
    n_estimators = argsDict['n_estimators'] * 5 + 50
    learning_rate = argsDict["learning_rate"] * 0.02 + 0.05
    subsample = argsDict["subsample"] * 0.1 + 0.7
    min_child_weight = argsDict["min_child_weight"]+1
    print "max_depth:" + str(max_depth)
    print "n_estimator:" + str(n_estimators)
    print "learning_rate:" + str(learning_rate)
    print "subsample:" + str(subsample)
    print "min_child_weight:" + str(min_child_weight)
    global attr_train,label_train

    gbm = xgb.XGBClassifier(nthread=4,    #进程数
                            max_depth=max_depth,  #最大深度
                            n_estimators=n_estimators,   #树的数量
                            learning_rate=learning_rate, #学习率
                            subsample=subsample,      #采样数
                            min_child_weight=min_child_weight,   #孩子数
                            max_delta_step = 10,  #10步不降则停止
                            objective="binary:logistic")

    metric = cross_val_score(gbm,attr_train,label_train,cv=5,scoring="roc_auc").mean()
    print metric
    return -metric

space = {"max_depth":hp.randint("max_depth",15),
         "n_estimators":hp.randint("n_estimators",10),  #[0,1,2,3,4,5] -> [50,]
         "learning_rate":hp.randint("learning_rate",6),  #[0,1,2,3,4,5] -> 0.05,0.06
         "subsample":hp.randint("subsample",4),#[0,1,2,3] -> [0.7,0.8,0.9,1.0]
         "min_child_weight":hp.randint("min_child_weight",5), #
        }
algo = partial(tpe.suggest,n_startup_jobs=1)
best = fmin(GBM,space,algo=algo,max_evals=4)#max_evals表示想要训练的最大模型数量,越大越容易找到最优解

print best
print GBM(best)

 

Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐