pyspark模型训练
1、pyspark启动正常情况pyspark shell的启动成功后的界面:[admin@datacenter4 ~]$ pysparkPython 2.7.5 (default, Nov 16 2020, 22:23:17)[GCC 4.8.5 20150623 (Red Hat 4.8.5-44)] on linux2Type "help", "copyright", "credits" or
1、pyspark启动
正常情况pyspark shell的启动成功后的界面:
[admin@datacenter4 ~]$ pyspark
Python 2.7.5 (default, Nov 16 2020, 22:23:17)
[GCC 4.8.5 20150623 (Red Hat 4.8.5-44)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 2.3.2.3.1.4.0-315
/_/
Using Python version 2.7.5 (default, Nov 16 2020 22:23:17)
SparkSession available as 'spark'.
2、pyspark shell脚本测试
2.1 sc的生成方式
2.1.1 正常启动pyspark shell
这时候不能用structure_1()中的方式生成sc,会报pyspark shell已创建了sc,不能重复生成,也就是这种情况下默认是可以直接适用sc变量的。
[admin@datacenter4 ~]$ pyspark
Python 2.7.5 (default, Nov 16 2020, 22:23:17)
[GCC 4.8.5 20150623 (Red Hat 4.8.5-44)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 2.3.2.3.1.4.0-315
/_/
Using Python version 2.7.5 (default, Nov 16 2020 22:23:17)
SparkSession available as 'spark'.
>>>
>>> import numpy
>>> from pyspark.mllib.fpm import PrefixSpan
>>> data = [[["a", "b"], ["c"]],[["a"], ["c", "b"], ["a", "b"]],[["a", "b"], ["e"]],
[["f"]]]
>>> # 这里的sc用的默认的
>>> rdd = sc.parallelize(data)
备注:正常启动下,如果还想自定sc,可以用structure_2()中的方式生成sc
2.1.2 非正常启动pyspark shell
pyspark shell启动还未显示“spark”标志就被强制终止,有时会仍然进入pyspark指令页面,如下:
[admin@datacenter3 site-packages]$ pyspark
Python 2.7.5 (default, Aug 7 2019, 00:51:29)
[GCC 4.8.5 20150623 (Red Hat 4.8.5-39)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
22/03/30 10:16:37 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
# 这里ctrl+c强制退出,可以看到最后仍出现">>>",说明仍然进入pyspark指令页面
^CTraceback (most recent call last):
File "/usr/hdp/current/spark2-client/python/pyspark/shell.py", line 45, in <module>
spark = SparkSession.builder\
File "/usr/hdp/current/spark2-client/python/pyspark/sql/session.py", line 173, in getOrCreate
sc = SparkContext.getOrCreate(sparkConf)
File "/usr/hdp/current/spark2-client/python/pyspark/context.py", line 358, in getOrCreate
SparkContext(conf=conf or SparkConf())
File "/usr/hdp/current/spark2-client/python/pyspark/context.py", line 119, in __init__
conf, jsc, profiler_cls)
File "/usr/hdp/current/spark2-client/python/pyspark/context.py", line 181, in _do_init
self._jsc = jsc or self._initialize_context(self._conf._jconf)
File "/usr/hdp/current/spark2-client/python/pyspark/context.py", line 297, in _initialize_context
return self._jvm.JavaSparkContext(jconf)
File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1523, in __call__
File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 985, in send_command
File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1152, in send_command
File "/usr/lib64/python2.7/socket.py", line 447, in readline
data = self._sock.recv(self._rbufsize)
KeyboardInterrupt
>>>
>>> import numpy
这种情况下,默认的sc是还没有生成的,直接调用sc会报错,此时structure_1()、structure_2()都可以用来生成sc。
2.2 模型训练-代码测试
# Autor chenfeng
#!/usr/bin/env Python
# coding=utf-8
def structure_1():
#计算文件中包含a和b的行数
from pyspark import SparkConf,SparkContext
conf = SparkConf().setMaster("local[*]").setAppName("My App")#配置环境信息
sc = SparkContext(conf=conf)#创建指挥官
# logFile = "file:///usr/local/spark/README.md" #注意本地file文件需要///
return sc
'''
以下代码来源 https://blog.csdn.net/u013719780/article/details/51822346
'''
def structure_2():
from pyspark.sql import SparkSession, HiveContext
spark = SparkSession.builder.master("local[*]")\
.appName("sdl-test")\
.config("spark.executor.memory", "3g")\
.config("spark.driver.memory","1g")\
.enableHiveSupport()\
.getOrCreate()
sc = spark.sparkContext
sql = spark.sql
hive_sql = HiveContext(sc)
print(sc.master)
return sc
# !!!!!!!!spark-shell模式,这里可以不用写;集群命令提交模式,需要放开!!!!!!!!
#sc = structure_1()
# 这里如果是本地文件,需要以‘file///’开头,否则默认就是hdfs地址
path = '/cf_sdl/hour_noheader.csv'
raw_data = sc.textFile(path)
num_data = raw_data.count()
records = raw_data.map(lambda x: x.split(','))
first = records.first()
print('数据的第一行:', first)
print('数据样本数:', num_data)
# 因为变量records下文经常要用到,此处对其进行缓存:
records.cache()
# 为了将类型特征表示成二维形式,我们将特征值映射到二元向量中非0的位置。下面定义这样一个映射函数:
def get_mapping(rdd, idx):
return rdd.map(lambda fields: fields[idx]).distinct().zipWithIndex().collectAsMap()
print('第三个特征的类别编码: %s '%get_mapping(records,2))
mappings = [get_mapping(records, i) for i in range(2,10)] #对类型变量的列(第2~9列)应用映射函数
print('类别特征打编码字典:',mappings)
cat_len = sum(map(len,[i for i in mappings])) #类别特征的个数
#cat_len = sum(map(len, mappings))
#print(map(len,mappings)
num_len = len(records.first()[11:15]) #数值特征的个数
total_len = num_len+cat_len #所有特征的个数
print('类别特征的个数: %d'% cat_len)
print('数值特征的个数: %d'% num_len)
print('所有特征的个数::%d' % total_len)
# 1.1 为线性模型创建特征向量
from pyspark.mllib.regression import LabeledPoint
import numpy as np
def extract_features(record):
cat_vec = np.zeros(cat_len)
step = 0
for i,raw_feature in enumerate(record[2:9]):
dict_code = mappings[i]
index = dict_code[raw_feature]
cat_vec[index+step] = 1
step = step+len(dict_code)
num_vec = np.array([float(raw_feature) for raw_feature in record[10:14]])
return np.concatenate((cat_vec, num_vec))
def extract_label(record):
return float(record[-1])
data = records.map(lambda point: LabeledPoint(extract_label(point),extract_features(point)))
first_point = data.first()
print('原始特征向量:' +str(first[2:]))
print('标签:' + str(first_point.label))
print('对类别特征进行独热编码之后的特征向量: \n' + str(first_point.features))
print('对类别特征进行独热编码之后的特征向量长度:' + str(len(first_point.features)))
# 1.2 为决策树创建特征向量
def extract_features_dt(record):
# return np.array(map(float, record[2:14])) # python2.x
return np.array(record[2:14]) # python3.x
data_dt = records.map(lambda point: LabeledPoint(extract_label(point), extract_features_dt(point)))
first_point_dt = data_dt.first()
print('决策树特征向量: '+str(first_point_dt.features))
print('决策树特征向量长度: '+str(len(first_point_dt.features)))
# 2 模型的训练和应用
from pyspark.mllib.regression import LinearRegressionWithSGD
from pyspark.mllib.tree import DecisionTree
# help(LinearRegressionWithSGD.train)
# 2.1 在bike sharing 数据上训练回归模型
linear_model = LinearRegressionWithSGD.train(data, iterations=10, step=0.1, intercept =False)
true_vs_predicted = data.map(lambda point:(point.label,linear_model.predict(point.features)))
print('线性回归模型对前5个样本的预测值: '+ str(true_vs_predicted.take(5)))
# 2.2 决策树
dt_model = DecisionTree.trainRegressor(data_dt,{})
preds = dt_model.predict(data_dt.map(lambda p: p.features))
actual = data.map(lambda p:p.label)
true_vs_predicted_dt = actual.zip(preds)
print('决策树回归模型对前5个样本的预测值: '+str(true_vs_predicted_dt.take(5)))
print('决策树模型的深度: ' + str(dt_model.depth()))
print('决策树模型的叶子节点个数: '+str(dt_model.numNodes()))
# 3 评估回归模型的性能
# 3.1 均方误差和均方根误差
def squared_error(actual, pred):
return (pred-actual)**2
# 3.2 平均绝对误差
def abs_error(actual, pred):
return np.abs(pred-actual)
# 3.3 均方根对数误差
def squared_log_error(pred, actual):
return (np.log(pred+1)-np.log(actual+1))**2
# 3.5 计算不同度量下的性能
# 3.5.1 线性模型
# mse = true_vs_predicted.map(lambda (t, p): squared_error(t, p)).mean() #python2.x
mse = true_vs_predicted.map(lambda t_p: squared_error(t_p[0], t_p[1])).mean()
mae = true_vs_predicted.map(lambda t_p: abs_error(t_p[0], t_p[1])).mean()
rmsle = np.sqrt(true_vs_predicted.map(lambda t_p: squared_log_error(t_p[0], t_p[1])).mean())
print('Linear Model - Mean Squared Error: %2.4f' % mse)
print('Linear Model - Mean Absolute Error: %2.4f' % mae)
print('Linear Model - Root Mean Squared Log Error: %2.4f' % rmsle)
# 3.5.2 决策树
mse_dt = true_vs_predicted_dt.map(lambda t_p: squared_error(t_p[0], t_p[1])).mean()
mae_dt = true_vs_predicted_dt.map(lambda t_p: abs_error(t_p[0], t_p[1])).mean()
rmsle_dt = np.sqrt(true_vs_predicted_dt.map(lambda t_p: squared_log_error(t_p[0], t_p[1])).mean())
print('Decision Tree - Mean Squared Error: %2.4f' % mse_dt)
print('Decision Tree - Mean Absolute Error: %2.4f' % mae_dt)
print('Decision Tree - Root Mean Squared Log Error: %2.4f' %rmsle_dt)
# 4 改进模型性能和参数调优
import matplotlib
from matplotlib.pyplot import hist
import numpy as np
import matplotlib.pyplot as plt
targets = records.map(lambda r: float(r[-1])).collect()
# hist(targets, bins=40, color='lightblue', normed=True) # python2.x
hist(targets, bins=40, color='lightblue')
fig = matplotlib.pyplot.gcf()
fig.set_size_inches(12, 6)
plt.show()
# 绘制对目标变量进行对数变换后的分布直方图。
log_targets = records.map(lambda r : np.log(float(r[-1]))).collect()
# plt.hist(log_targets, bins = 40, color ='lightblue', normed =True)
plt.hist(log_targets, bins = 40, color ='lightblue')
fig = plt.gcf()
fig.set_size_inches(12,6)
plt.show()
# 绘制对目标变量进行平方根变换后的分布直方图。
sqrt_targets = records.map(lambda r: np.sqrt(float(r[-1]))).collect()
plt.hist(sqrt_targets, bins=40, color='lightblue')
# plt.hist(sqrt_targets, bins=40, color='lightblue', normed=True)
fig = matplotlib.pyplot.gcf()
fig.set_size_inches(12, 6)
plt.show()
# 4.1 目标变量变换对模型的影响
# 线性回归
data_log = data.map(lambda lp:LabeledPoint(np.log(lp.label),lp.features))
model_log =LinearRegressionWithSGD.train(data_log, iterations=10, step=0.1)
true_vs_predicted_log = data_log.map(lambda p:(np.exp(p.label),np.exp(model_log.predict(p.features))))
#计算模型的MSE,MAE,RMSLE
mse_log = true_vs_predicted_log.map(lambda t_p: squared_error(t_p[0], t_p[1])).mean()
mae_log = true_vs_predicted_log.map(lambda t_p: abs_error(t_p[0], t_p[1])).mean()
rmsle_log = np.sqrt(true_vs_predicted_log.map(lambda t_p: squared_log_error(t_p[0], t_p[1])).mean())
print('Linear Model —— Mean Squared Error:%2.4f'% mse_log)
print('Linear Model —— Mean Absolue Error:%2.4f'% mae_log)
print('Linear Model —— Root Mean Squared Log Error:%2.4f'% rmsle_log)
print('Linear Model —— Non log-transformed predictions:\n'+ str(true_vs_predicted.take(3)))
print('Linear Model —— Log-transformed predictions:\n'+ str(true_vs_predicted_log.take(3)))
# 决策树
data_dt_log = data_dt.map(lambda lp:LabeledPoint(np.log(lp.label), lp.features))
dt_model_log = DecisionTree.trainRegressor(data_dt_log,{})
preds_log = dt_model_log.predict(data_dt_log.map(lambda p:p.features))
actual_log = data_dt_log.map(lambda p: p.label)
true_vs_predicted_dt_log = actual_log.zip(preds_log).map(lambda t_p:(np.exp(t_p[0]), np.exp(t_p[1])))
#计算模型的MSE,MAE,RMSLE
mse_log_dt = true_vs_predicted_dt_log.map(lambda t_p: squared_error(t_p[0], t_p[1])).mean()
mae_log_dt = true_vs_predicted_dt_log.map(lambda t_p: abs_error(t_p[0], t_p[1])).mean()
rmsle_log_dt = np.sqrt(true_vs_predicted_dt_log.map(lambda t_p: squared_log_error(t_p[0], t_p[1])).mean())
print('Decision Tree —— Mean Squared Error:%2.4f'% mse_log_dt)
print('Decision Tree —— Mean Absolue Error:%2.4f'% mae_log_dt)
print('Decision Tree —— Root Mean Squared Log Error:%2.4f'% rmsle_log_dt)
print('Decision Tree —— Non log-transformed predictions:\n'+ str(true_vs_predicted_dt.take(3)))
print('Decision Tree —— Log-transformed predictions:\n'+str(true_vs_predicted_dt_log.take(3)))
# 4.2 模型参数调优
'''
到目前为止,本文讨论了同一个数据集上对MLlib中的回归模型进行训练和评估的基本概率。接下来,我们使用交叉验证方法来评估不同参数对模型性能的影响。
首先,我们将原始数据按比率划分为train,test数据集,原书当中pyspark版本还没有randomSplit这个函数,所以用如下的方式处理:
'''
# 4.2.1 线性回归
data_with_idx = data.zipWithIndex().map(lambda k_v: (k_v[1],k_v[0]))
test = data_with_idx.sample(False,0.2,42)
train = data_with_idx.subtractByKey(test)
train_test_data_split = data.randomSplit([0.8,0.2],123)
train = train_test_data_split[0]
test = train_test_data_split[1]
print('测试集的样本数:',test.count())
print('训练集的样本数:',train.count())
# 决策树
train_test_data_dt_split = data_dt.randomSplit([0.8,0.2],123)
train_dt = train_test_data_dt_split[0]
test_dt = train_test_data_dt_split[1]
'''
前面已经得到了训练集和测试集,下面研究不同参数设置对模型性能的影响,首先需要为线性模型设置一个评估方法,
同时创建一个辅助函数,实现在不同参数设置下评估训练集和测试集上的性能。
本文依然使用Kaggle竞赛中的RMSLE作为评价指标。这样可以和在竞赛排行榜的成绩进行比较。
'''
# 评估函数定义如下:
def evaluate(train, test, iterations, step, regParam, regType, intercept):
model =LinearRegressionWithSGD.train(train, iterations, step, regParam=regParam,
regType=regType,intercept=intercept)
testLabel_vs_testPrediction = test.map(lambda point:(point.label, model.predict(point.features)))
rmsle = np.sqrt(testLabel_vs_testPrediction.map(lambda t_p:squared_log_error(t_p[0],t_p[1])).mean())
return rmsle
# 4.2.1.1 迭代次数对模型的影响:
params = [1, 5, 10, 20, 50, 100]
metrics = [evaluate(train, test, param, 0.01, 0.0, 'l2', False) for param in params]
print(params)
print(metrics)
# 绘制迭代次数与RMSLE的关系图:
plt.plot(params, metrics)
fig = matplotlib.pyplot.gcf()
fig.set_size_inches(12, 6)
plt.xscale('log')
plt.show()
# 4.2.1.2 评估step对模型的影响
'''
从结果可以看出为什么不使用默认步长(默认1.0)来训练线性模型,因为其得到的RMSLE结果为nan。这说明SGD模型收敛到了最差的局部最优解。这种情况在步长较大的时候容易出现,原因是算法收敛太快导致不能得到最优解。
另外,小的步长与相对较小的迭代次数(比如上面的10次)对应的训练模型性能一般较差,而较小的步长与较大的迭代次数通常可以收敛到较好的结果。
'''
params=[0.01,0.025,0.05,0.1,0.5,1.0]
metrics =[evaluate(train, test,10,param,0.0,'l2',False)for param in params]
for i in range(len(params)):
print('the rmsle:%f when step :%f'%(metrics[i],params[i]))
#绘制步长与RMSLE的关系图:
plt.plot(params, metrics)
fig = matplotlib.pyplot.gcf()
fig.set_size_inches(12, 6)
plt.xscale('log')
plt.xlabel('step')
plt.ylabel('RMSLE')
plt.show()
# 4.2.1.3 不同正则化系数对模型的影响
'''
我们知道随着正则化的提高,训练集的预测性能会下降,因为模型不能很好的拟合数据。
但是我们希望设置合适的正则化参数,能够在测试集上达到最好的性能,最终得到一个泛化能力最优的模型。
'''
# (1) 先看L2正则化系数对模型的影响
params=[0.0,0.01,0.1,1.0,5.0,10.0,20.0]
metrics =[evaluate(train, test,10,0.1, param,'l2',False) for param in params]
for i in range(len(params)):
print('the rmsle:%f when regParam :%f'%(metrics[i],params[i]))
#绘制L2正则化系数与RMSLE的关系图:
plt.plot(params, metrics)
fig = matplotlib.pyplot.gcf()
fig.set_size_inches(12, 8)
plt.xscale('log')
plt.xlabel('regParam')
plt.ylabel('RMSLE')
plt.show()
# (2) 再看L1正则化系数对模型的影响
params=[0.0,0.01,0.1,1.0,10.0,100.0,1000.0]
metrics =[evaluate(train, test,10,0.1, param,'l1',False) for param in params]
for i in range(len(params)):
print('the rmsle:%f when regParam :%f'%(metrics[i],params[i]))
#绘制L2正则化系数与RMSLE的关系图:
plt.plot(params, metrics)
fig = matplotlib.pyplot.gcf()
fig.set_size_inches(12, 8)
plt.xscale('log')
plt.xlabel('regParam')
plt.ylabel('RMSLE')
plt.show()
'''
从上图可以看到,当使用一个较大的正则化参数时,RMSLE性能急剧下降。
想必大家都知道,使用L1正则化可以得到稀疏的权重向量,我们看看刚刚得到的L1正则化模型是否真是如此呢?
从下面的结果可以看到,与我们预料的基本一致。随着L1正则化参数越来越大,模型的权重向量中0的个数越来越多。
'''
model_l1 = LinearRegressionWithSGD.train(train,10,0.1,regParam=1.0, regType='l1', intercept=False)
model_l2 = LinearRegressionWithSGD.train(train,10,0.1,regParam=1.0, regType='l2', intercept=False)
model_l1_10 = LinearRegressionWithSGD.train(train,10,0.1,regParam=10.0, regType='l1', intercept=False)
model_l2_10 = LinearRegressionWithSGD.train(train,10,0.1,regParam=10.0, regType='l2', intercept=False)
model_l1_100 = LinearRegressionWithSGD.train(train,10,0.1,regParam=100.0, regType='l1', intercept=False)
model_l2_100 = LinearRegressionWithSGD.train(train,10,0.1,regParam=100.0, regType='l2', intercept=False)
# model_l1.weights.array 把DenseVector转array
print('L2 (1.0) number of zero weights:'+ str(sum(model_l1.weights.array == 0))) # 这里可以正常运行
print('L2 (1.0) number of zero weights:'+ str(sum(model_l2.weights.array == 0)))
print('L1 (10.0) number of zeros weights:'+ str(sum(model_l1_10.weights.array == 0)))
print('L2 (10.0) number of zeros weights:'+ str(sum(model_l2_10.weights.array == 0)))
print('L1 (100.0) number of zeros weights:'+ str(sum(model_l1_100.weights.array == 0)))
print('L2 (100.0) number of zeros weights:'+ str(sum(model_l2_100.weights.array == 0)))
# 4.2.1.4 截距对模型的影响
params=[False, True]
metrics =[evaluate(train, test, 10, 0.1, 1.0,'l2', param) for param in params]
for i in range(len(params)):
print('the rmsle:%f when intercept:%f'%(metrics[i],params[i]))
#绘制L2正则化系数与RMSLE的关系图:
plt.bar(params, metrics, color='r')
fig = matplotlib.pyplot.gcf()
fig.set_size_inches(12, 8)
plt.xlabel('intercept')
plt.ylabel('RMSLE')
plt.show()
# 4.2.2 决策树
def evaluate_dt(train, test, maxDepth, maxBins):
model =DecisionTree.trainRegressor(train,{},impurity='variance', maxDepth=maxDepth, maxBins=maxBins)
predictions = model.predict(test.map(lambda point: point.features))
actual = test.map(lambda point: point.label)
actual_vs_predictions = actual.zip(predictions)
rmsle = np.sqrt(actual_vs_predictions.map(lambda t_p: squared_log_error(t_p[0],t_p[1])).mean())
return rmsle
# 4.2.2.1 树的不同最大深度对性能影响:
'''
我们通常希望用更复杂(更深)的决策树提升模型的性能。而较小的树深度类似正则化形式,
如线性模型的L2正则化和L1正则化,存在一个最优的树深度能在测试集上获得最优的性能。
'''
params=[1,2,3,4,5,10,20]
metrics =[evaluate_dt(train_dt, test_dt, param,32) for param in params]
for i in range(len(params)):
print('the rmsle:%f when maxDepth :%d'%(metrics[i],params[i]))
#绘制树的最大深度与RMSLE的关系图:
plt.plot(params, metrics)
fig = matplotlib.pyplot.gcf()
fig.set_size_inches(12, 8)
plt.xlabel('maxDepth')
plt.ylabel('RMSLE')
plt.show()
# 4.2.2.2 最大划分数(每个节点分支时最大bin数)对模型的影响
'''
最后,我们来看看划分数对决策树性能的影响。和树的深度一样,更多的划分数会使模型变得更加复杂,并且有助于提升特征维度较大的模型性能。划分数到一定程度之后,对性能的提升帮助不大。
实际上, 由于过拟合的原因会导致测试集的性能变差。
从结果可以看出,最大划分数会影响模型的性能,但是当最大划分数达到30之后,模型性能基本上没有获得提升。最优的最大划分数是在30到35之间。
'''
params=[2,4,8,16,32,64,100]
metrics =[evaluate_dt(train_dt, test_dt,5, param) for param in params]
for i in range(len(params)):
print('the rmsle:%f when maxBins :%d'%(metrics[i],params[i]))
#绘制树的最大划分数与RMSLE的关系图:
plt.plot(params, metrics)
fig = matplotlib.pyplot.gcf()
fig.set_size_inches(12, 8)
plt.xlabel('maxDepth')
plt.ylabel('RMSLE')
plt.show()
3、集群提交python文件
把2.2中的代码写在spark-test.py文件,并上传到spark集群中的某台上(目录:/root/cf_temp/spark_test.py)
在这台机器上执行:(每个参数的含义可以看“大数据相关”收藏夹里有关spark的文档)
nohup /usr/hdp/current/spark2-client/bin/spark-submit --master yarn \
--num-executors 6 \ # 集群所有机器总共的executor数量
--executor-memory 1g \ # 每个executor的内存
--executor-cores 2 \ # 每个executor执行task的最大并行度
--driver-memory 1G \ # 提交当前application的driver占有的内存
--driver-cores 1G \ # 提交当前application的driver占有的cpu cores个数
/root/cf_temp/spark_test.py &
备注:
在spark平台上执行Python算法涉及到Python程序的改写,其中import部分需要额外注意。如果我们在执行某个 test.py 程序时需要调用另外一个 common.py,需要在 test.py 内部添加 import common ,而此时的 import common 不能放在程序头部,需要放在context之后。同时在执行程序时需要--py-files 参数指定引用模块的位置。
nohup /usr/hdp/current/spark2-client/bin/spark-submit --master yarn \
--num-executors 6 \
--executor-memory 1g \
--executor-cores 2 \
--driver-memory 1G \
--driver-cores 1G \
--py-files /xx/xx/common.py \
/xx/xx/test.py &
更多推荐
所有评论(0)