上一篇博文:R语言中的代码运算性能提升

R语言运行在CPU单核单线程上,使用并行计算原因是程序运行时间太长。大部分程序都可以进行并行化改造以提高运算性能

1.lapply

只需要一个参数(list\vector\array\matrix\data.frame),和一个以该参数为输入的函数,函数返回列表list

lapply(1:3/3, round, digits=3);
[[1]] 
[1] 
0.333 
[[2]] 
[1] 
0.667 
[[3]] 
[1] 
1

2.parallel

1.基本概述

在同一个CPU上利用多个核同时运算相同函数,parallel首先初始化一个集群,集群数量最好是CPU核数-1。如果一台8核建立数量=8集群,那CPU就干不了其他事情

由于parallel包函数使用Rscript调用方式,对象被复制多份(多核),因此内存占用较多,在大数据条件就要谨慎使用

library(parallel)
#Calculate the number of cores检查电脑当前可用核数
no_cores<-detectCores(logical=F)		#F-物理CPU核心数/T-逻辑CPU核心数

#Initiate cluster发起集群,同时创建数个R进行并行计算
#只是创建待用的核,而不是并行运算环境
cl<-makeCluster(no_cores)

#现只需要使用并行化版本的lapply,parLapply就可以
parLapply(cl, 1:10000,function(exponent) 2^exponent)
#当结束后要关闭集群,否则电脑内存会始终被R占用
stopCluster(cl)

2.Parallel变量作用域

在Mac/Linux系统中使用 makeCluster(no_core, type="FORK")选项从而当并行运行时可包含所有环境变量

在Windows中由于使用的是Parallel Socket Cluster (PSOCK),每个集群只加载base包,所以运行时要指定加载特定的包或变量

cl<-makeCluster(no_cores)
base<-2					#特定变量
clusterExport(cl, "base")	#将base变量加载到集群中,导入多个c("a","b","c")
parLapply(cl, 2:4, function(exponent)  base^exponent)
stopCluster(cl)
##############################
clusterExport(cl=NULL,varlist,envir=.GlobalEnv)		#varlist-要导入的对象名称(字符向量)

clusterEvalQ(cl,expr)利用创建的cl核执行expr命令语句(若命令太长可写到文件中,<-)

clusterEvalQ(cl,source(file="code.r"))

在函数中使用一些其他包就要使用clusterEvalQ加载,比如使用rms,要用clusterEvalQ(cl, library(rms))。要注意的是在clusterExport加载进某些变量后,这些变量的任何变化都会被忽略

cl<-makeCluster(no_cores)
base=2 
clusterExport(cl, "base")		#加载base变量
base <- 4   					#变量值发生变化
parLapply(cl, 2:4, function(exponent) base^exponent) 
# Finish 
stopCluster(cl) 
[[1]] 
[1] 
4 
[[2]] 
[1] 
8 
[[3]] 
[1] 
16

3.parSapply

让程序返回向量|矩阵而不是列表,那么就应该使用sapply,同样也有并行版本parSapply

par开头族函数与apply函数族用法基本一样,还有parApply/parRapply/parCapply等

parSapply(cl = NULL, X, FUN, ..., simplify = TRUE, USE.NAMES = TRUE)

parSapply(cl, 2:4, function(exponent)  base^exponent)
[1]  4  8 16
#输出矩阵并显示行名和列名(因此才需要使用as.character,如果不做转化,将是矩阵默认列名)
parSapply(cl, as.character(2:4),  function(exponent){
	x <- as.numeric(exponent)
	c(base = base^x, self = x^x)
 })
2  3   4
base 4  8  16
self 4 27 256

3.foreach

支持并行运算的扩展包,发挥多核计算优势

1.基本概述

设计foreach思想可能是要创建一个lapply和for循环的标准,初始化过程有些不同,需要register注册集群

后面的表达式尽量用{}括起来

%do%-执行单进程任务,即便启动多进程环境也是徒劳

%dopar%-多进程任务

library(foreach)
library(doParallel)			#doParallel适合Windows/Linux/Mac
no_cores<-detectCores(logical=F)
cl<-makeCluster(no_cores) 		#先发起集群
registerDoParallel(cl)			#再进行登记注册
#最后结束集群
stopImplicitCluster()			#停止隐式集群
stopCluster()


#foreach函数可使用参数.combine控制结果汇总方法
base=2						#不需要将base变量加载到集群中
foreach(exponent = 2:4, .combine = c)  %dopar%   base^exponent
  [1]  4  8 16

#数据框
foreach(exponent = 2:4, .combine = rbind)  %dopar%   base^exponent
    [,1]
result.1    4
result.2    8
result.3   16
foreach(exponent = 2:4, .combine = list, .multicombine = TRUE)  %dopar%   base^exponent
[[1]]
[1] 4
[[2]]
[1] 8
[[3]]
[1] 16
#最后list-combine方法是默认的.这个例子中用到.multicombine参数可避免未知的嵌套列表foreach(exponent = 2:4, .combine = list)  %dopar%  base^exponent
[[1]]
[[1]][[1]]
[1] 4
[[1]][[2]]
[1] 8
[[2]]
[1] 16

2.foreach中变量作用域

foreach中变量作用域有些不同,它会自动加载本地环境(不能直接调用上层环境)到函数

base<-2
cl<-makeCluster(2)
registerDoParallel(cl)
foreach(exponent = 2:4, .combine = c)  %dopar% base^exponent
stopCluster(cl)
[1]  4  8 16

#但对于父环境变量则不会加载
test<-function(exponent) {
foreach(exponent = 2:4, .combine = c)  %dopar% base^exponent
}
test()
Error in base^exponent : task 1 failed - "object 'base' not found" 


#为解决error可使用.export参数而不要使用clusterExport.它可以加载最终版本变量,在函数运行前变量都是可以改变
base<-2
cl<-makeCluster(2)
registerDoParallel(cl)
base<-4
test<-function (exponent) {
  foreach(exponent = 2:4, .combine = c, .export = "base")  %dopar%  
  base^exponent
}
test()
stopCluster(cl)
[1]  16  64 256
#类似可使用.packages参数来加载包(非系统安装包),比如.packages = c("rms", "mice")

3.使用Fork|Sork

若主要在windows上做分析,也习惯使用PSOCK。对使用其他系统的人要意识到两者区别

FORK:"to divide in branches and go separate ways"
系统:Unix/Mac (not Windows) 
环境:所有

PSOCK:并行socket集群 
系统:All (including Windows) 
环境:空

4.内存控制

#如果不打算使用windows系统,建议尝试FORK模式,可实现内存共享从而节省内存
PSOCK:
library(pryr)
no_cores<-detectCores(logical=F)
cl<-makeCluster(no_cores)
clusterExport(cl,"a")
clusterEvalQ(cl,library(pryr))
#address检查R对象的内部属性
parSapply(cl, X = 1:10, function(x) address(a)) == address(a)
#输出结果为FLASE说明没有实现内存共享
[1] FALSE FALSE FALSE FALSE FALSE FALSE FALSE FALSE FALSE FALSE

FORK:
cl<-makeCluster(no_cores, type="FORK")
parSapply(cl, X = 1:10, function(x) address(a)) == address(a)
#输出结果为TRUE说明实现内存共享
[1] TRUE TRUE TRUE TRUE TRUE TRUE TRUE TRUE TRUE TRUE


b<-0
clusterExport(cl,"b")
parSapply(cl, X = 1:10, function(x) {b<-b + 1; b})
[1]
1 1 1 1 1 1 1 1 1 1
parSapply(cl, X = 1:10, function(x) {b <<- b + 1; b})	#两个核心集群
[1] 1 2 3 4 5 1 2 3 4 5
b
[1] 0

5.程序调试

并行环境中debug很困难,不能使用browser/cat/print函数来发现问题

tryCatch-list:stop函数不是好方法,因为当程序运行1-2天后突然弹出错误,就只因为这一个错误程序终止,并把之前做的计算全部扔掉,这是不合适的。为此可使用tryCatch捕捉错误,从而使出现错误后程序还能继续执行

foreach(x=list(1,2,"a"),.combine=list)  %dopar%  
{
  tryCatch({
    c(1/x, x, 2^x)
  }, error = function(e) return(paste0("The variable '", x, "'", 
                                      " caused the error: '", e, "'")))
}
[[1]]
[1] 1 1 2
[[2]]
[1] 0.5 2.0 4.0
[[3]]
[1] "The variable 'a' caused the error: 'Error in 1/x: non-numeric argument to binary operator\n'"

创建文件输出:当无法在控制台观测每个工作时,可设置共享文件,让结果输出到文件中

cl<-makeCluster(no_cores, outfile = "debug.txt")
registerDoParallel(cl)
foreach(x=list(1, 2, "a"))  %dopar%  
{
  print(x)
}
stopCluster(cl)
#debug文件输出,当代码出现错误时不会出现以下信息
starting worker pid=7392 on localhost:11411 at 00:11:21.077
starting worker pid=7276 on localhost:11411 at 00:11:21.319
starting worker pid=7576 on localhost:11411 at 00:11:21.762

创建结点专用文件:若数据集存在一些问题时,可以方便观测

cl<-makeCluster(no_cores, outfile = "debug.txt")
registerDoParallel(cl)
foreach(x=list(1, 2, "a"))  %dopar%  
{
  cat(dput(x), file = paste0("debug_file_", x, ".txt"))
} 
stopCluster(cl)

DEBUG日志输出文件

library(foreach)
library(doParallel)			
no_cores<-detectCores(logical=F)
cl<-makeCluster(no_cores,outfile="debug.txt") 		
registerDoParallel(cl)			

ceshi<-function(x){
	a<-tryCatch(
		{
		c(1/x, x, 2^x)
		},error=function(e) 
			return(paste0("The variable '", x, "'", " caused the error: '", e, "'"))
	)
	cat(x,'-----',a,file=paste0("debug_file_", x, ".txt"))	#不同节点必须写入不同文件
	return(a)
}

foreach(x=list(1,2,"a",3,0,'b',4),.combine=c) %dopar% ceshi(x)

stopImplicitCluster()
stopCluster(cl)

6.任务载入、载入平衡

无论parLapply还是foreach都是包装(wrapper)函数,意味着它们不是直接执行并行计算代码,而是依赖其他函数实现的。在parLapply中的定义如下:

parLapply <- function (cl = NULL, X, fun, ...) 
{
    cl <- defaultCluster(cl)
    do.call(c, clusterApply(cl, x = splitList(X, length(cl)), 
        fun = lapply, fun, ...), quote = TRUE)
}

splitList(X,length(cl)) 将任务分割成多个部分,然后将其发送到不同集群中。如果有很多cache或者存在一个任务比其他worker中任务都大,那么在这个任务结束前,其他提前结束的worker都会处于空闲状态。为避免这一情况需要将任务尽量平均分配给每个worker。举个例子,若需要计算优化神经网络的参数,这一过程可并行地以不同参数来训练神经网络

# From the nnet example
parLapply(cl, c(10, 20, 30, 40, 50), function(neurons) 
  nnet(ir[samp,], targets[samp,], size = neurons))
改为:顺序调整,分配更加合理
# From the nnet example
parLapply(cl, c(10, 50, 30, 40, 20), function(neurons) 
  nnet(ir[samp,], targets[samp,], size = neurons))

7.内存载入

在大数据情况下使用并行计算会很快出现问题。因为使用并行计算会极大消耗内存,必须注意不要让R运行内存到达内存上限,否则将会导致崩溃或非常缓慢。使用Forks是控制内存上限的重要方法,Fork通过内存共享实现,而不需要额外的内存空间,这对性能的影响是很显著的

Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐