MR工作流程

JOB提交

客户端提交Job.waitForCompletion
Job.submit
setUseNewAPI()适配API
connect()建立连接为YARNåå
此时状态为JobState.DEFINE
submitter.submitJobInternal
这里面有几个流程
###判断输出路径是否存在
checkSpecs(job)
每次忘了删除输出目录报错

if (outDir.getFileSystem(job.getConfiguration()).exists(outDir)) {
      throw new FileAlreadyExistsException("Output directory " + outDir + 
                                           " already exists");
    }

创建staging路径

JobSubmissionFiles.getStagingDir(cluster, conf)->Cluster.getStagingAreaDir->YARNRunner.getStagingAreaDir->ResourceMgrDelegate.getStagingAreaDir->MRApps.getStagingAreaDir

  private static final String STAGING_CONSTANT = ".staging";
  public static Path getStagingAreaDir(Configuration conf, String user) {
    return new Path(conf.get(MRJobConfig.MR_AM_STAGING_DIR,
        MRJobConfig.DEFAULT_MR_AM_STAGING_DIR)
        + Path.SEPARATOR + user + Path.SEPARATOR + STAGING_CONSTANT);
  }

conf.get(“yarn.app.mapreduce.am.staging-dir”,"/tmp/hadoop-yarn/staging")+"/"+.staging
默认路径/tmp/hadoop-yarn/staging/.staging

获取jobId

JobID jobId = submitClient.getNewJobID()
TODO 比较复杂
job相关上传到submitJobDir目录
submitJobDir = new Path(jobStagingArea, jobId.toString())
/tmp/hadoop-yarn/staging/.staging/${jobid}

上传jar包到集群

copyAndConfigureFiles()->JobResourceUploader.uploadResources(job, jobSubmitDir)->JobResourceUploader.uploadResourcesInternal
mkdirs(jtFs, submitJobDir, mapredSysPerms)创建submitJobDir目录
上传逻辑

uploadFiles(job, files, submitJobDir, mapredSysPerms, replication,
        fileSCUploadPolicies, statCache);
    uploadLibJars(job, libjars, submitJobDir, mapredSysPerms, replication,
        fileSCUploadPolicies, statCache);
    uploadArchives(job, archives, submitJobDir, mapredSysPerms, replication,
        archiveSCUploadPolicies, statCache);
    uploadJobJar(job, jobJar, submitJobDir, replication, statCache);
    addLog4jToDistributedCache(job, submitJobDir);

计算切片,生成切片规划文件

writeSplits(job, submitJobDir)
使用writeNewSplits(job, jobSubmitDir)
List splits = input.getSplits(job)
FileInputFormat.getSplits

long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job))
getFormatMinSplitSize()为1
getMinSplitSize(job)为mapreduce.input.fileinputformat.split.minsize,缺省1
long maxSize = getMaxSplitSize(job)
mapreduce.input.fileinputformat.split.maxsize,缺省MAX_VALUE
循环遍历文件,每个文件单独切片
isSplitable是否支持切割,snappy是不能进行切片的。
获取块大小long blockSize = file.getBlockSize();
computeSplitSize获取切片大小,块大小等于切片大小
Math.max(minSize, Math.min(maxSize, blockSize))
maxsize为最大值,blocksize为128M,取blocksize。和minisize1比,blocksize又是大的。

//SPLIT_SLOP1.1,是如果文件稍微大于blocksize一点点,切出来没意义,就是文件大于blocksize的1.1倍才切。
  while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
            splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                        blkLocations[blkIndex].getHosts(),
                        blkLocations[blkIndex].getCachedHosts()));
                        //减去分出去的块,继续循环切割
            bytesRemaining -= splitSize;
          }

JobSplitWriter.createSplitFiles形成切片文件
maxsize设置的比blocksize小,会让切片变小,
minsize调的比blocksize大,会让切片变大

向Stag路径写job.XML

writeConf(conf, submitJobFile)
写逻辑在conf.writeXml(out)->TransformerImpl.transform
->transform(source, toHandler, _encoding)->transferOutputProperties->DOM2TO.parse
使用java的dom处理xml

提交YARN

配置上下文

上面事情做完后,调用submitClient.submitJob
ApplicationSubmissionContext
appContext =createApplicationSubmissionContext(conf, jobSubmitDir, ts)
在setupAMCommand设置一堆参数
其中有
vargs.add(MRJobConfig.APPLICATION_MASTER_CLASS); “org.apache.hadoop.mapreduce.v2.app.MRAppMaster”
把上下文传给RM
resMgrDelegate.submitApplication(appContext)

状态转换

这一块太复杂,就经过一些了状态变化,RM、AM等注册,调用,然后启动了MRAPPMaster
RMAppEventType.START->RMStateStore的STORE_APP->MAppEventType.APP_NEW_SAVED->FifoScheduler的APP_ADDED->RMAppEventType.APP_ACCEPTED->RMAppAttemptEventType.START->FifoScheduler的APP_ATTEMPT_ADDED->RMAppAttemptEventType.ATTEMPT_ADDED->RMNodeEventType.STATUS_UPDATE->FifoScheduler的NODE_UPDATE-> RMAppAttemptEventType.CONTAINER_ALLOCATED->RMAppAttemptState.ALLOCATED_SAVING->RMStateStoreEventType.STORE_APP_ATTEMP ->RMAppAttemptEventType.ATTEMPT_NEW_SAVED ->RMAppAttemptEventType.LAUNCHED->ContainerEventType.INIT_CONTAINER->ContainersLauncherEventType.LAUNCH_CONTAINER

最后
containerLauncher.submit(launch);
ContainerLaunch 的call
exec.activateContainer(containerID, pidFilePath);
就是调用org.apache.hadoop.mapreduce.v2.app.MRAppMaster,main

MRAppMaster

是MapReduce的ApplicationMaster实现,它使得MapReduce计算框架可以运行于YARN之上。在YARN中,MRAppMaster负责管理MapReduce作业的生命周期,包括创建MapReduce作业,向ResourceManager申请资源,与NodeManage通信要求其启动Container,监控作业的运行状态,当任务失败时重新启动任务等。
进入main方法
继续调用initAndStartAppMaster->appMaster.start()->serviceStart()
这个serviceStart()是实现appMaster的serviceStart方法

  1. 调用createJob()方法创建作业Job实例job
      protected Job createJob(Configuration conf, JobStateInternal forcedState, 
      String diagnostic) {
    
    // 创建一个作业Job实例newJob,其实现为JobImpl  
    Job newJob =
        new JobImpl(jobId, appAttemptID, conf, dispatcher.getEventHandler(),
            taskAttemptListener, jobTokenSecretManager, jobCredentials, clock,
            completedTasksFromPreviousRun, metrics,
            committer, newApiCommitter,
            currentUser.getUserName(), appSubmitTime, amInfos, context, 
            forcedState, diagnostic);
            // 将新创建的作业newJob的jobId与其自身的映射关系存储到应用运行上下文信息context中的jobs集合中
    ((RunningAppContext) context).jobs.put(newJob.getID(), newJob);
    
    

//当作业被创建后,它就被定义了作业完成事件JobFinishEvent的处理器为JobFinishEventHandler dispatcher.register(JobFinishEvent.Type.class,
createJobFinishEventHandler());
return newJob;
}
```
2. 创建一个Job初始化事件initJobEvent
3. handle()方法,处理Job初始化事件initJobEvent
4. 启动客户端服务clientService
clientService.start()
5. 调用父类的serviceStart(),启动所有组件
super.serviceStart()
6. 调用startJobs()方法启动作业
startJobs();
这里也是经过了一些了状态变化,最后containerLauncher.submit(launch),调用yarnchaild的mian方法

YARNChaild

final Task taskFinal = task;
taskFinal.run(job, umbilical)
Task实现是MapTask和ReduceTask,run方法就进相应的逻辑

MAPTask

进入maptask的run方法
先进度条设置
然后判断是否cleanup
进入runNewMapper
1.生成TaskAttemptContextImpl实例,此实例中的Configuration就是job本身。
客户端上传任务到资源层,其中包括Jar包,配置文件,切片三个文件,container拿到可以实例化job

  org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =
      (org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)
        ReflectionUtils.newInstance(taskContext.getMapperClass(), job);

2.反射得到用户定义的Mapper实现类,也就是map函数的类

 org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =
  (org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)
        ReflectionUtils.newInstance(taskContext.getMapperClass(), job);

3.得到InputFormat实现类

 org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =
      (org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)
        ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);

默认TextInputFormat

  public Class<? extends InputFormat<?,?>> getInputFormatClass() 
     throws ClassNotFoundException {
    return (Class<? extends InputFormat<?,?>>) 
      conf.getClass(INPUT_FORMAT_CLASS_ATTR, TextInputFormat.class);
  }

4.得到当前task对应的InputSplit
split = getSplitDetails(new Path(splitIndex.getSplitLocation()),
splitIndex.getStartOffset());
splitIndex为JobImpl.transition的createSplits方法得到所有分片。通过切片文件位置,偏移得到该map获取的数据
5.通过InputFormat,得到对应的RecordReader,可以读取分片的一条数据。
new NewTrackingRecordReader<INKEY,INVALUE>
(split, inputFormat, reporter, taskContext)
private final org.apache.hadoop.mapreduce.RecordReader<K,V> real;
this.real = inputFormat.createRecordReader(split, taskContext);
为LineRecordReader

6.生成RecordWriter实例
如果没有reduce进入NewDirectOutputCollector
有进入NewOutputCollector
①.初始化了环形缓冲区
collector = createSortingCollector(job, reporter);
collector.init(context);
②.分区partitions

partitions = jobContext.getNumReduceTasks();
      if (partitions > 1) {
        partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>)
          ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);
      } else {
        partitioner = new org.apache.hadoop.mapreduce.Partitioner<K,V>() {
          @Override
          public int getPartition(K key, V value, int numPartitions) {
            return partitions - 1;
          }
        };
      }

7.初始化input
input.initialize(split, mapperContext);
就是LineRecordReader的initialize方法
8.执行run
mapper.run(mapperContext)
里面分别是setup,map,cleanup,调用用户自定义方法
context.write写入环形缓冲区
9.关闭输出流
input.close();
溢写

MapOutputBuffer 环形缓冲区

####参数
kvbuffer:环形缓冲区
kvmeta:meta信息缓冲区
kvindex:下次要插入的meta信息的起始位置
kvstart:溢写时meta数据的起始位置
kvend:溢写时meta数据的结束位置
bufindex:raw数据的结束位置
bufstart:溢写时raw数据的起始位置
bufend:溢写时raw数据的结束位置
equator:缓冲区的中界点
spillper:当数据占用超过这个比例,会造成溢写,由配置“mapreduce.map.sort.spill.percent”指定,默认值是0.8
sortmb:kvbuffer占用的内存总量,单位是M,由配置“mapreduce.task.index.cache.limit.bytes”指定,默认值是100
indexCacheMemoryLimit:存放溢写文件信息的缓存大小,由参数“mapreduce.task.index.cache.limit.bytes”指定,单位是byte,默认值是1024*1024(1M)
bufferRemaining:buffer剩余空间,字节为单位
softLimit:字节单位的溢写阈值,超过之后需要写磁盘

####初始化init
sorters缓冲区内排序设置为快速排序
kvbuffer设置为sortmb的大小,默认100M
equator为0
kvindex=int)
(((long)aligned - METASIZE + kvbuffer.length) % kvbuffer.length) / 4
aligned = = pos - (pos % METASIZE)

bufstart = bufend = bufindex = equator
bufferRemaining为空=softLimit
softLimit = (int) (kvbuffer.length * spillper);默认80M

写入缓冲区collect方法
未溢出时,没有溢写阻塞,
bufferRemaining-16,kvmeta空间
keystart为现在bufindex值为0
key序列化
System.arraycopy(b, off, kvbuffer, bufindex, gaplen)
更新bufindex为这次写入key的位置,1(key长度值)+key的长度
kvbuffer值,key的长度+每个key的值
bufferRemaining 减去相应大小

valstart为现在bufindex
序列化value
更新bufindex为这次写入value的位置,1(value长度值)+value的长度
kvbuffer值,key的长度+每个key的值+value的长度+每个value的值
bufferRemaining 减去相应大小

kemeta赋值
kvmeta.put(kvindex + PARTITION, partition);分区,默认hash值分区
kvmeta.put(kvindex + KEYSTART, keystart);key起始位置
kvmeta.put(kvindex + VALSTART, valstart);
kvmeta.put(kvindex + VALLEN, distanceTo(valstart, valend));
kvindex位置更新
kvindex = (kvindex - NMETA + kvmeta.capacity()) % kvmeta.capacity();(一直减少)
也就是往后移动16字节,4个int

当有溢写后,需要调整环形缓冲区参数

  final int avgRec = (int)
                                        (mapOutputByteCounter.getCounter() /
                                                mapOutputRecordCounter.getCounter());
                                // leave at least half the split     buffer for serialization data
                                // ensure that kvindex >= bufindex
                                final int distkvi = distanceTo(bufindex, kvbidx);
                                final int newPos = (bufindex +
                                        Math.max(2 * METASIZE - 1,
                                                Math.min(distkvi / 2,
                                                        distkvi / (METASIZE + avgRec) * METASIZE)))
                                        % kvbuffer.length;
                                setEquator(newPos);
                                bufmark = bufindex = newPos;
                                final int serBound = 4 * kvend;
                                // bytes remaining before the lock must be held and limits
                                // checked is the minimum of three arcs: the metadata space, the
                                // serialization space, and the soft limit
                                bufferRemaining = Math.min(
                                        // metadata max
                                        distanceTo(bufend, newPos),
                                        Math.min(
                                                // serialization max
                                                distanceTo(newPos, serBound),
                                                // soft limit
                                                softLimit)) - 2 * METASIZE;

####溢写
一直这么写,直到bufferRemaining<=0,或者数据少没有溢写,就是input.close时也就是flush
主要是sortAndSpill方法
按分区,进行文件生成
区内排序,快排
调用combiner方法
写文件

flush最后进行归并文件
mergeParts()

ReducTask

进入run方法
创建shuffleConsumerPlugin对象,也就是Shuffle类
三个过程copyPhase、sortPhase、reducePhase

copyPhase

shuffleConsumerPlugin.init(shuffleContext);
RawKeyValueIterator rIter = shuffleConsumerPlugin.run()
启动线程·监控map完成情况
EventFetcher线程,用来获取map完成的事件
LocalFetcher或者Fetcher获取map端输出数据线程

Fetcher线程

1、等待merge完成
merger.waitForResource()
2、获取host
host = scheduler.getHost()
3、拉取数据
copyFromHost(host)
getMapOutputURL
openShuffleUrl
connect
copyMapOutput这里会根据数据量创建MapOutput实例,一般创建的是InMemoryMapOutput,即将数据存到内存了(当数据量很少的时候会直接拷贝到磁盘,即创建OnDiskMapOutput)
mapOutput.shuffle里面有个doShuffle方法分别是InMemoryMapOutput和OnDiskMapOutput实现
InMemoryMapOutput中复制数据IOUtils.readFully(input, memory, 0, memory.length)
MergeThread.startMerge将数据merge到磁盘。
mapreduce.job.reduce.slowstart.completedmaps:在maptask完成了一定百分比后将触发fetch,默认为0.05

mapreduce.reduce.shuffle.read.timeout:fetch读数据的timeout时间,默认为3分钟

mapreduce.reduce.shuffle.maxfetchfailures:fetch最大的失败次数,默认为10次

mapreduce.reduce.shuffle.connect.timeout:fetch建立连接的timeout时间,默认也是3分钟

mapreduce.reduce.shuffle.parallelcopies:同时创建的fetch线程个数

这些线程结束,copy阶段完成

sortPhase

MergeManager Implmerger.close()得到RawKeyValueIterator

reducePhase

RawComparator comparator = job.getOutputValueGroupingComparator(); 分组比较器
runNewReducer
获取kv数据
final RawKeyValueIterator rawIter = rIter;
rIter = new RawKeyValueIterator()
createReduceContext创建上下文
reducer.run执行reduce

写文件

reduceContext.write->LineRecordWriter.write

Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐