项目完整解读源码地址:https://gitee.com/duhai123/xxl_job_study

执行器里面,实际上是在应用中内嵌了一个jetty服务器, 服务器在xxlJobExecutor 初始化的时候启动。 本次示例代码中是由spring-boot 中截取而来

第一步:初始化加载执行器配置XxlJobConfig

/**
	 * 注入bean
	 * @return
	 */
	@Bean(initMethod = "start", destroyMethod = "destroy")
	public XxlJobSpringExecutor xxlJobExecutor() {
		logger.info(">>>>>>>>>>> xxl-job config init.");
		XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
		xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
		xxlJobSpringExecutor.setAppName(appName);
		xxlJobSpringExecutor.setIp(ip);
		xxlJobSpringExecutor.setPort(port);
		xxlJobSpringExecutor.setAccessToken(accessToken);
		xxlJobSpringExecutor.setLogPath(logPath);
		xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);

		return xxlJobSpringExecutor;
	}

执行器Spring版本实现:第二步加载执行器:XxlJobSpringExecutor

/**
	 * 开始
	 * 
	 * @see com.xxl.job.core.executor.XxlJobExecutor#start()
	 */
	@Override
	public void start() throws Exception {

		//初始化JobHandler的仓库
		initJobHandlerRepository(applicationContext);

		// refresh GlueFactory
		GlueFactory.refreshInstance(1);

		//XxlJobExecutor启动
		super.start();
	}

initJobHandlerRepository:注册jobhandler到map中(@ComponentScan(basePackages = "com.xxl.job.executor.service.jobhandler")中扫描到的)

/**
	 * 注册jobhandler到map中
	 * 
	 * @param applicationContext
	 */
	private void initJobHandlerRepository(ApplicationContext applicationContext) {
		if (applicationContext == null) {
			return;
		}

		// 获取有JobHandler注解的所有类
		Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(JobHandler.class);

		if (serviceBeanMap != null && serviceBeanMap.size() > 0) {
			for (Object serviceBean : serviceBeanMap.values()) {
				if (serviceBean instanceof IJobHandler) {
					//获取注解中name的值@JobHandler(value="commandJobHandler")
					String name = serviceBean.getClass().getAnnotation(JobHandler.class).value();
					IJobHandler handler = (IJobHandler) serviceBean;
					
					//从jobHandlerRepository中找,如果存在就抛出异常
					if (loadJobHandler(name) != null) {
						throw new RuntimeException("xxl-job jobhandler naming conflicts.");
					}
					//注册到jobHandlerRepository的static的Map中去,方便在调用的时候找到
					registJobHandler(name, handler);
				}
			}
		}
	}

XxlJobExecutor:父执行器,最终都是这个启动

/**
	 * 初始化
	 * 
	 * @throws Exception
	 */
	public void start() throws Exception {

		// 初始化log日志
		XxlJobFileAppender.initLogPath(logPath);

		// init admin-client
		// 初始化调用调度中心的client列表
		initAdminBizList(adminAddresses, accessToken);

		// init JobLogFileCleanThread
		// 初始化日志文件清理线程
		JobLogFileCleanThread.getInstance().start(logRetentionDays);

		// init TriggerCallbackThread
		// 初始化触发器回调线程(用RPC回调调度中心接口)
		TriggerCallbackThread.getInstance().start();

		// init executor-server
		// 初始化执行器服务
		port = port > 0 ? port : NetUtil.findAvailablePort(9999);
		// 获取ip
		ip = (ip != null && ip.trim().length() > 0) ? ip : IpUtil.getIp();
		initRpcProvider(ip, port, appName, accessToken);
	}

x先看initAdminBizList:初始化调用调度中心的client列表

// -------- admin-client (rpc invoker) --------调度中心client(用这个去调用调度中心的接口)--------TODO---------------
	// 调度中心client列表
	private static List<AdminBiz> adminBizList;

	/**
	 * 初始化调度中心
	 * 
	 * @param adminAddresses
	 *            xxl.job.admin.addresses
	 * @param accessToken
	 *            xxl.job.accessToken
	 * @throws Exception
	 */
	private void initAdminBizList(String adminAddresses, String accessToken) throws Exception {
		if (adminAddresses != null && adminAddresses.trim().length() > 0) {
			for (String address : adminAddresses.trim().split(",")) {
				if (address != null && address.trim().length() > 0) {

					String addressUrl = address.concat(AdminBiz.MAPPING);

					// 获取调度中心的AdminBiz,用这个类可以调用到调度中心的RPC接口
					AdminBiz adminBiz = (AdminBiz) new XxlRpcReferenceBean(NetEnum.JETTY,
							Serializer.SerializeEnum.HESSIAN.getSerializer(), CallType.SYNC, AdminBiz.class, null,
							10000, addressUrl, accessToken, null).getObject();

					if (adminBizList == null) {
						adminBizList = new ArrayList<AdminBiz>();
					}
					adminBizList.add(adminBiz);
				}
			}
		}
	}

TriggerCallbackThread:初始化触发器回调线程(用RPC回调调度中心接口)

/**
	 * 开始执行,不停的回调
	 */
	public void start() {

		// valid
		if (XxlJobExecutor.getAdminBizList() == null) {
			logger.warn(">>>>>>>>>>> xxl-job, executor callback config fail, adminAddresses is null.");
			return;
		}

		// callback
		triggerCallbackThread = new Thread(new Runnable() {
			@Override
			public void run() {

				// 正常的回调
				while (!toStop) {
					try {
						HandleCallbackParam callback = getInstance().callBackQueue.take();
						if (callback != null) {

							// callback list param
							List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();
							// int drainToNum =
							// 移除此队列中所有可用的元素,并将它们添加到给定 callbackParamList 中
							getInstance().callBackQueue.drainTo(callbackParamList);
							callbackParamList.add(callback);

							// callback, will retry if error
							if (callbackParamList != null && callbackParamList.size() > 0) {
								doCallback(callbackParamList);
							}
						}
					} catch (Exception e) {
						logger.error(e.getMessage(), e);
					}
				}

				// last callback===停止后最后的回调
				try {
					List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();
					// int drainToNum =
					// 移除此队列中所有可用的元素,并将它们添加到给定 callbackParamList 中
					getInstance().callBackQueue.drainTo(callbackParamList);
					if (callbackParamList != null && callbackParamList.size() > 0) {
						doCallback(callbackParamList);
					}
				} catch (Exception e) {
					logger.error(e.getMessage(), e);
				}
				logger.info(">>>>>>>>>>> xxl-job, executor callback thread destory.");

			}
		});
		triggerCallbackThread.setDaemon(true);
		triggerCallbackThread.start();

		// 失败回调文件重试回调进程
		triggerRetryCallbackThread = new Thread(new Runnable() {
			@Override
			public void run() {
				while (!toStop) {
					try {
						// 读取失败回调文件,重试进行回调操作
						retryFailCallbackFile();
					} catch (Exception e) {
						logger.error(e.getMessage(), e);
					}
					try {
						TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
					} catch (InterruptedException e) {
						logger.warn(">>>>>>>>>>> xxl-job, executor retry callback thread interrupted, error msg:{}",
								e.getMessage());
					}
				}
				logger.info(">>>>>>>>>>> xxl-job, executor retry callback thread destory.");
			}
		});
		triggerRetryCallbackThread.setDaemon(true);
		triggerRetryCallbackThread.start();

	}

initRpcProvider:初始化执行器服务

/**
	 * 初始化RpcProvider----- TODO ----<br/>
	 * (与调度中心不同,这边内嵌了一个jetty服务器<br/>
	 * 有一个web端口server.port,一个RPC接口端口xxl.job.executor.port)
	 * 
	 * @param ip
	 * @param port
	 * @param appName
	 * @param accessToken
	 * @throws Exception
	 */
	private void initRpcProvider(String ip, int port, String appName, String accessToken) throws Exception {
		// init invoker factory
		xxlRpcInvokerFactory = new XxlRpcInvokerFactory();

		// init, provider factory 初始化提供者工厂
		String address = IpUtil.getIpPort(ip, port);
		Map<String, String> serviceRegistryParam = new HashMap<String, String>();
		serviceRegistryParam.put("appName", appName);
		serviceRegistryParam.put("address", address);
		// 初始化提供者工厂
		xxlRpcProviderFactory = new XxlRpcProviderFactory();
		// 最后两个参数与执行器注册线程的相关 TODO
		xxlRpcProviderFactory.initConfig(NetEnum.JETTY, Serializer.SerializeEnum.HESSIAN.getSerializer(), ip, port,
				accessToken, ExecutorServiceRegistry.class, serviceRegistryParam);

		// add services 增加服务接口和服务实现,供给调用中心调用
		xxlRpcProviderFactory.addService(ExecutorBiz.class.getName(), null, new ExecutorBizImpl());

		// RpcProvider工厂启动:
		// 1.执行器注册线程serviceRegistry启动
		// 2.内置jetty service启动
		xxlRpcProviderFactory.start();

	}

1.执行器注册线程serviceRegistry启动:

/**
		 * xxlRpcProviderFactory.start();内部启用:远程注册到调度中心
		 * 
		 * @see com.xxl.rpc.registry.ServiceRegistry#start(java.util.Map)
		 */
		@Override
		public void start(Map<String, String> param) {
			// start registry:开始执行器注册线程
			ExecutorRegistryThread.getInstance().start(param.get("appName"), param.get("address"));
		}

ExecutorRegistryThread:执行器注册线程

/**
	 * 执行器信息注册到注册中心
	 * 
	 * @param appName
	 * @param address
	 */
	public void start(final String appName, final String address) {

		// valid
		if (appName == null || appName.trim().length() == 0) {
			logger.warn(">>>>>>>>>>> xxl-job, executor registry config fail, appName is null.");
			return;
		}
		if (XxlJobExecutor.getAdminBizList() == null) {
			logger.warn(">>>>>>>>>>> xxl-job, executor registry config fail, adminAddresses is null.");
			return;
		}

		// 注册线程
		registryThread = new Thread(new Runnable() {
			@Override
			public void run() {

				// registry
				while (!toStop) {// 每隔一个时间注册一次,保证执行器的活着状态
					try {
						RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(),
								appName, address);
						for (AdminBiz adminBiz : XxlJobExecutor.getAdminBizList()) {
							try {
								// 执行器信息注册到注册中心(XxlRpcReferenceBean 来代理的adminBiz)
								ReturnT<String> registryResult = adminBiz.registry(registryParam);
								if (registryResult != null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
									registryResult = ReturnT.SUCCESS;
									logger.info(
											">>>>>>>>>>> xxl-job registry success, registryParam:{}, registryResult:{}",
											new Object[] { registryParam, registryResult });
									break;
								} else {
									logger.info(
											">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}",
											new Object[] { registryParam, registryResult });
								}
							} catch (Exception e) {
								logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, e);
							}

						}
					} catch (Exception e) {
						logger.error(e.getMessage(), e);
					}

					try {
						// 休息一个时间
						TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
					} catch (InterruptedException e) {
						logger.warn(">>>>>>>>>>> xxl-job, executor registry thread interrupted, error msg:{}",
								e.getMessage());
					}
				}

				// 最后结束的时候,把执行器从远程移除
				try {
					RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appName,
							address);
					for (AdminBiz adminBiz : XxlJobExecutor.getAdminBizList()) {
						try {
							// 执行器从注册中心删除
							ReturnT<String> registryResult = adminBiz.registryRemove(registryParam);
							if (registryResult != null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
								registryResult = ReturnT.SUCCESS;
								logger.info(
										">>>>>>>>>>> xxl-job registry-remove success, registryParam:{}, registryResult:{}",
										new Object[] { registryParam, registryResult });
								break;
							} else {
								logger.info(
										">>>>>>>>>>> xxl-job registry-remove fail, registryParam:{}, registryResult:{}",
										new Object[] { registryParam, registryResult });
							}
						} catch (Exception e) {
							logger.info(">>>>>>>>>>> xxl-job registry-remove error, registryParam:{}", registryParam,
									e);
						}

					}
				} catch (Exception e) {
					logger.error(e.getMessage(), e);
				}
				logger.info(">>>>>>>>>>> xxl-job, executor registry thread destory.");

			}
		});
		registryThread.setDaemon(true);
		registryThread.start();
	}

2.内置jetty service启动(大概看一看,仔细研究请看xxl-pc)

/**
	 * JettyServer启动
	 * 
	 * @see com.xxl.rpc.remoting.net.Server#start(com.xxl.rpc.remoting.provider.XxlRpcProviderFactory)
	 */
	@Override
	public void start(final XxlRpcProviderFactory xxlRpcProviderFactory) throws Exception {

		thread = new Thread(new Runnable() {

			@Override
			public void run() {

				// The Server
				server = new org.eclipse.jetty.server.Server(new QueuedThreadPool()); // default maxThreads 200,
																						// minThreads 8
				// TODO, thread config, change to async servlet

				// HTTP connector
				ServerConnector connector = new ServerConnector(server);
				/*
				 * if (ip!=null && ip.trim().length()>0) { // TODO, support set registry ip、and bind ip
				 * //connector.setHost(ip); // The network interface this connector binds to as an IP address or a
				 * hostname. If null or 0.0.0.0, then bind to all interfaces. }
				 */
				connector.setPort(xxlRpcProviderFactory.getPort());
				server.setConnectors(new Connector[] { connector });

				// Set a handler
				HandlerCollection handlerc = new HandlerCollection();
				// 设置一个处理器JettyServerHandler(重点)
				handlerc.setHandlers(new Handler[] { new JettyServerHandler(xxlRpcProviderFactory) });
				server.setHandler(handlerc);

				try {
					server.start();

					logger.info(">>>>>>>>>>> xxl-rpc remoting server start success, nettype = {}, port = {}",
							JettyServer.class.getName(), xxlRpcProviderFactory.getPort());
					onStarted();

					server.join();
				} catch (Exception e) {
					logger.error(">>>>>>>>>>> xxl-rpc remoting server start error.", e);
				} finally {
					try {
						stop();
					} catch (Exception e) {
						logger.error(e.getMessage(), e);
					}
				}
			}
		});
		thread.setDaemon(true); // daemon, service jvm, user thread leave >>> daemon leave >>> jvm leave
		thread.start();
	}

JettyServerHandler 接收请求后的处理流程:

/**
	 * 处理方法{@inheritDoc}
	 * 
	 * @see org.eclipse.jetty.server.Handler#handle(java.lang.String, org.eclipse.jetty.server.Request,
	 *      javax.servlet.http.HttpServletRequest, javax.servlet.http.HttpServletResponse)
	 */
	@Override
	public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response)
			throws IOException, ServletException {

		if ("/services".equals(target)) { // services mapping

			StringBuffer stringBuffer = new StringBuffer("<ui>");
			for (String serviceKey : xxlRpcProviderFactory.getServiceData().keySet()) {
				stringBuffer.append("<li>").append(serviceKey).append(": ")
						.append(xxlRpcProviderFactory.getServiceData().get(serviceKey)).append("</li>");
			}
			stringBuffer.append("</ui>");

			writeResponse(baseRequest, response, stringBuffer.toString().getBytes());
			return;
		} else { // default remoting mapping = 默认的远程mapping

			// request parse
			XxlRpcRequest xxlRpcRequest = null;
			try {
				// 获取远程请求信息
				xxlRpcRequest = parseRequest(request);
			} catch (Exception e) {
				writeResponse(baseRequest, response, ThrowableUtil.toString(e).getBytes());
				return;
			}

			// invoke=执行对应类的方法
			XxlRpcResponse xxlRpcResponse = xxlRpcProviderFactory.invokeService(xxlRpcRequest);

			// response-serialize + response-write 序列化返回信息
			byte[] responseBytes = xxlRpcProviderFactory.getSerializer().serialize(xxlRpcResponse);
			// 回写
			writeResponse(baseRequest, response, responseBytes);
		}

	}

通过上面的handle中的代码可以知道,主要的执行逻辑在xxlRpcProviderFactory.invokeService的方法中, 

/**
	 * invoke service<br/>
	 * 执行对应类的方法
	 *
	 * @param xxlRpcRequest
	 * @return
	 */
	public XxlRpcResponse invokeService(XxlRpcRequest xxlRpcRequest) {

		// make response
		XxlRpcResponse xxlRpcResponse = new XxlRpcResponse();
		xxlRpcResponse.setRequestId(xxlRpcRequest.getRequestId());

		// match service bean
		String serviceKey = makeServiceKey(xxlRpcRequest.getClassName(), xxlRpcRequest.getVersion());
		Object serviceBean = serviceData.get(serviceKey);

		// valid
		if (serviceBean == null) {
			xxlRpcResponse.setErrorMsg("The serviceKey[" + serviceKey + "] not found.");
			return xxlRpcResponse;
		}

		if (System.currentTimeMillis() - xxlRpcRequest.getCreateMillisTime() > 3 * 60 * 1000) {
			xxlRpcResponse.setErrorMsg("The timestamp difference between admin and executor exceeds the limit.");
			return xxlRpcResponse;
		}
		if (accessToken != null && accessToken.trim().length() > 0
				&& !accessToken.trim().equals(xxlRpcRequest.getAccessToken())) {
			xxlRpcResponse.setErrorMsg("The access token[" + xxlRpcRequest.getAccessToken() + "] is wrong.");
			return xxlRpcResponse;
		}

		/*
		 * 通过反射方式来执行方法
		 */
		try {
			Class<?> serviceClass = serviceBean.getClass();
			String methodName = xxlRpcRequest.getMethodName();
			Class<?>[] parameterTypes = xxlRpcRequest.getParameterTypes();
			Object[] parameters = xxlRpcRequest.getParameters();

			Method method = serviceClass.getMethod(methodName, parameterTypes);
			method.setAccessible(true);
			// 执行方法
			Object result = method.invoke(serviceBean, parameters);

			xxlRpcResponse.setResult(result);
		} catch (Throwable t) {
			logger.error("xxl-rpc provider invokeService error.", t);
			xxlRpcResponse.setErrorMsg(ThrowableUtil.toString(t));
		}

		return xxlRpcResponse;
	}

通过调度中心发过来的参数,以及执行器的处理逻辑,我们有理由可以得出此时是执行的是ExecutorBizImpl中的run方法

/**
	 * 运行某一个触发器: 1.不动/创建/替换一个任务执行线程
	 * 
	 * @see com.xxl.job.core.biz.ExecutorBiz#run(com.xxl.job.core.biz.model.TriggerParam)
	 */
	@Override
	public ReturnT<String> run(TriggerParam triggerParam) {
		// load old:jobHandler + jobThread
		// 获取是否有这个JobId的线程
		JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());
		// handler
		IJobHandler jobHandler = jobThread != null ? jobThread.getHandler() : null;
		// 删除旧的线程的原因
		String removeOldReason = null;

		/*
		 * valid:jobHandler + jobThread 验证jobHandler 和 jobThread
		 */
		GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());
		if (GlueTypeEnum.BEAN == glueTypeEnum) {// bean模式

			// 查询jobHandlerRepository里面是否有该名字的JobHandler
			IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());

			// valid old jobThread
			if (jobThread != null && jobHandler != newJobHandler) {// 两个不一样
				// change handler, need kill old thread
				removeOldReason = "change jobhandler or glue type, and terminate the old job thread.";

				jobThread = null;
				jobHandler = null;
			}

			// valid handler
			if (jobHandler == null) {
				jobHandler = newJobHandler;
				if (jobHandler == null) {
					return new ReturnT<String>(ReturnT.FAIL_CODE,
							"job handler [" + triggerParam.getExecutorHandler() + "] not found.");
				}
			}

		} else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) {

			// valid old jobThread
			if (jobThread != null
					&& !(jobThread.getHandler() instanceof GlueJobHandler && ((GlueJobHandler) jobThread.getHandler())
							.getGlueUpdatetime() == triggerParam.getGlueUpdatetime())) {
				// change handler or gluesource updated, need kill old thread
				removeOldReason = "change job source or glue type, and terminate the old job thread.";

				jobThread = null;
				jobHandler = null;
			}

			// valid handler
			if (jobHandler == null) {
				try {
					IJobHandler originJobHandler = GlueFactory.getInstance()
							.loadNewInstance(triggerParam.getGlueSource());
					jobHandler = new GlueJobHandler(originJobHandler, triggerParam.getGlueUpdatetime());
				} catch (Exception e) {
					logger.error(e.getMessage(), e);
					return new ReturnT<String>(ReturnT.FAIL_CODE, e.getMessage());
				}
			}
		} else if (glueTypeEnum != null && glueTypeEnum.isScript()) {

			// valid old jobThread
			if (jobThread != null && !(jobThread.getHandler() instanceof ScriptJobHandler
					&& ((ScriptJobHandler) jobThread.getHandler()).getGlueUpdatetime() == triggerParam
							.getGlueUpdatetime())) {
				// change script or gluesource updated, need kill old thread
				removeOldReason = "change job source or glue type, and terminate the old job thread.";

				jobThread = null;
				jobHandler = null;
			}

			// valid handler
			if (jobHandler == null) {
				jobHandler = new ScriptJobHandler(triggerParam.getJobId(), triggerParam.getGlueUpdatetime(),
						triggerParam.getGlueSource(), GlueTypeEnum.match(triggerParam.getGlueType()));
			}
		} else {
			return new ReturnT<String>(ReturnT.FAIL_CODE, "glueType[" + triggerParam.getGlueType() + "] is not valid.");
		}

		// executor block strategy
		if (jobThread != null) {

			// 阻塞处理策略
			ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum
					.match(triggerParam.getExecutorBlockStrategy(), null);

			if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {// 并行
				// discard when running
				if (jobThread.isRunningOrHasQueue()) {
					return new ReturnT<String>(ReturnT.FAIL_CODE,
							"block strategy effect:" + ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());
				}
			} else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {// 覆盖
				// kill running jobThread
				if (jobThread.isRunningOrHasQueue()) {
					removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();
					jobThread = null;
				}
			} else {
				// just queue trigger
			}
		}

		// replace thread (new or exists invalid)
		if (jobThread == null) {// 新建一个jobThread代替老的
			jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);
		}

		// push data to queue:触发器增加一次要触发执行的任务
		ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
		return pushResult;
	}

通过上面我们可以发现, 执行executorBiz的run 方法的时候, 首先会通过JOBID,从本地线程库里面获取该任务对应的线程,同时,如果任务的JobHandler有更新的话,

那么会自动使用最新的jobHandler , 同时根据任务的阻塞策略。 执行不同的操作。 最终,如果是第一次执行任务的时候,系统会分配给改任务一个线程,同时启动该线程。

接下来,可以在具体看一下JobThread 的run方法,看下最终的任务是如何执行的。

@Override
	public void run() {

		// init
		try {
			// 执行IJobHandler 中的init方法,以后如果有一些,在执行handler之前的初始化的工作,可以覆写这个方法
			handler.init();
		} catch (Throwable e) {
			logger.error(e.getMessage(), e);
		}

		// execute= stop 为fasle的时候执行
		while (!toStop) {
			running = false;
			// 执行次数
			idleTimes++;

			TriggerParam triggerParam = null;
			ReturnT<String> executeResult = null;
			try {
				// to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout)
				// 从linkBlockingQueue中获取数据,如果3秒获取不到,则返回null
				triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);
				if (triggerParam != null) {
					running = true;
					// 将运行次数清空,保证运行90秒空闲之后会被移除
					idleTimes = 0;
					// 去掉这条数据
					triggerLogIdSet.remove(triggerParam.getLogId());

					// log filename, like "logPath/yyyy-MM-dd/9999.log"
					// 创建日志
					String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTim()),
							triggerParam.getLogId());
					XxlJobFileAppender.contextHolder.set(logFileName);

					// 写入分片信息, 将当前机器的分片标记和分片总数写入到ShardingUtil中,到时候,可以在handler中通过这个工具类获取
					ShardingUtil.setShardingVo(new ShardingUtil.ShardingVO(triggerParam.getBroadcastIndex(),
							triggerParam.getBroadcastTotal()));

					// execute
					XxlJobLogger.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:"
							+ triggerParam.getExecutorParams());

					// 超时时间大于0
					if (triggerParam.getExecutorTimeout() > 0) {
						// limit timeout
						Thread futureThread = null;
						try {
							final TriggerParam triggerParamTmp = triggerParam;

							// 开一个线程执行
							FutureTask<ReturnT<String>> futureTask = new FutureTask<ReturnT<String>>(
									new Callable<ReturnT<String>>() {
										@Override
										public ReturnT<String> call() throws Exception {
											// 执行。。。
											return handler.execute(triggerParamTmp.getExecutorParams());
										}
									});
							futureThread = new Thread(futureTask);
							futureThread.start();
							// 获取执行结果
							executeResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS);

						} catch (TimeoutException e) {

							XxlJobLogger.log("<br>----------- xxl-job job execute timeout");
							XxlJobLogger.log(e);

							executeResult = new ReturnT<String>(IJobHandler.FAIL_TIMEOUT.getCode(),
									"job execute timeout ");
						} finally {
							futureThread.interrupt();
						}
					} else {// 否则直接执行
						// just execute
						executeResult = handler.execute(triggerParam.getExecutorParams());
					}

					if (executeResult == null) {
						executeResult = IJobHandler.FAIL;
					}
					XxlJobLogger
							.log("<br>----------- xxl-job job execute end(finish) -----------<br>----------- ReturnT:"
									+ executeResult);
				} else {
					if (idleTimes > 30) {
						// 每3秒获取一次数据,获取30次都没有获取到数据之后,则现场被清除
						XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit.");
					}
				}
			} catch (Throwable e) {
				if (toStop) {
					XxlJobLogger.log("<br>----------- JobThread toStop, stopReason:" + stopReason);
				}

				StringWriter stringWriter = new StringWriter();
				e.printStackTrace(new PrintWriter(stringWriter));
				String errorMsg = stringWriter.toString();
				executeResult = new ReturnT<String>(ReturnT.FAIL_CODE, errorMsg);

				XxlJobLogger.log("<br>----------- JobThread Exception:" + errorMsg
						+ "<br>----------- xxl-job job execute end(error) -----------");
			} finally {
				if (triggerParam != null) {
					// callback handler info
					if (!toStop) {
						// handler执行完成之后,将结果写入到日志里面去,
						// 就是在执行器启动的时候,会建立一个线程,用来实时处理日志,此处是将结果和logID放入到队列里面去,
						TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(),
								triggerParam.getLogDateTim(), executeResult));
					} else {
						// is killed
						ReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE,
								stopReason + " [job running,killed]");
						// 任务执行后,放入回调队列
						TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(),
								triggerParam.getLogDateTim(), stopResult));
					}
				}
			}
		}

		// callback trigger request in queue
		// 当现场被终止之后,队列里面剩余的未执行的任务,将被终止的这些任务放入队列,供日志监控线程来处理,回调给调度中心
		while (triggerQueue != null && triggerQueue.size() > 0) {
			TriggerParam triggerParam = triggerQueue.poll();
			if (triggerParam != null) {
				// is killed
				ReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE,
						stopReason + " [job not executed, in the job queue, killed.]");
				// 任务执行后,放入回调队列
				TriggerCallbackThread.pushCallBack(
						new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTim(), stopResult));
			}
		}

		// destroy
		try {
			handler.destroy();
		} catch (Throwable e) {
			logger.error(e.getMessage(), e);
		}

		logger.info(">>>>>>>>>>> xxl-job JobThread stoped, hashCode:{}", Thread.currentThread());
	}

最后来看一下,TriggerCallbackThread.pushCallBack ()这个方法,将本次任务记录的日志ID和处理结果放入队列中去了

public void start() {

		// valid
		if (XxlJobExecutor.getAdminBizList() == null) {
			logger.warn(">>>>>>>>>>> xxl-job, executor callback config fail, adminAddresses is null.");
			return;
		}

		// callback
		triggerCallbackThread = new Thread(new Runnable() {
			@Override
			public void run() {

				// 正常的回调
				while (!toStop) {
					try {
						HandleCallbackParam callback = getInstance().callBackQueue.take();
						if (callback != null) {

							// callback list param
							List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();
							// int drainToNum =
							// 移除此队列中所有可用的元素,并将它们添加到给定 callbackParamList 中
							getInstance().callBackQueue.drainTo(callbackParamList);
							callbackParamList.add(callback);

							// callback, will retry if error
							if (callbackParamList != null && callbackParamList.size() > 0) {
                                //进行回调操作
								doCallback(callbackParamList);
							}
						}
					} catch (Exception e) {
						logger.error(e.getMessage(), e);
					}
				}

				// last callback===停止后最后的回调
				try {
					List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();
					// int drainToNum =
					// 移除此队列中所有可用的元素,并将它们添加到给定 callbackParamList 中
					getInstance().callBackQueue.drainTo(callbackParamList);
					if (callbackParamList != null && callbackParamList.size() > 0) {
						doCallback(callbackParamList);
					}
				} catch (Exception e) {
					logger.error(e.getMessage(), e);
				}
				logger.info(">>>>>>>>>>> xxl-job, executor callback thread destory.");

			}
		});
		triggerCallbackThread.setDaemon(true);
		triggerCallbackThread.start();

		// 失败回调文件重试回调进程
		triggerRetryCallbackThread = new Thread(new Runnable() {
			@Override
			public void run() {
				while (!toStop) {
					try {
						// 读取失败回调文件,重试进行回调操作
						retryFailCallbackFile();
					} catch (Exception e) {
						logger.error(e.getMessage(), e);
					}
					try {
						TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
					} catch (InterruptedException e) {
						logger.warn(">>>>>>>>>>> xxl-job, executor retry callback thread interrupted, error msg:{}",
								e.getMessage());
					}
				}
				logger.info(">>>>>>>>>>> xxl-job, executor retry callback thread destory.");
			}
		});
		triggerRetryCallbackThread.setDaemon(true);
		triggerRetryCallbackThread.start();
	}

doCallback:

/**
	 * do callback, will retry if error 回调操作
	 * 
	 * @param callbackParamList
	 */
	private void doCallback(List<HandleCallbackParam> callbackParamList) {
		boolean callbackRet = false;
		// callback, will retry if error
		// 获取调度中心的adminBiz列表,在执行器启动的时候,初始化的,
		for (AdminBiz adminBiz : XxlJobExecutor.getAdminBizList()) {
			try {
				// 这里的adminBiz 调用的callback方法,因为是通过NetComClientProxy 这个factoryBean创建的代理对象,
				// 在getObject方法中,最终是没有调用的目标类方法的invoke的。 只是将目标类的方法名,参数,类名,等信息发送给调度中心了
				// 发送的地址调度中心的接口地址是 :“调度中心IP/api” 这个接口 。 这个是在执行器启动的时候初始化设置好的。
				// 调度中心的API接口拿到请求之后,通过参数里面的类名,方法,参数,反射出来一个对象,然后invoke, 最终将结果写入数据库
				ReturnT<String> callbackResult = adminBiz.callback(callbackParamList);
				if (callbackResult != null && ReturnT.SUCCESS_CODE == callbackResult.getCode()) {
					callbackLog(callbackParamList, "<br>----------- xxl-job job callback finish.");
					// 因为调度中心是集群式的,所以只要有一台机器返回success,那么就算成功,直接break
					callbackRet = true;
					break;
				} else {
					// callback log 回调日志
					callbackLog(callbackParamList,
							"<br>----------- xxl-job job callback fail, callbackResult:" + callbackResult);
				}
			} catch (Exception e) {
				// callback log 回调日志
				callbackLog(callbackParamList,
						"<br>----------- xxl-job job callback error, errorMsg:" + e.getMessage());
			}
		}
		if (!callbackRet) {
			// 增加失败回调文件信息
			appendFailCallbackFile(callbackParamList);
		}
	}

retryFailCallbackFile:失败回调文件,进行重试回调操作

/**
	 * 读取失败回调文件,重试进行回调操作
	 */
	private void retryFailCallbackFile() {
		// load and clear file
		List<String> fileLines = FileUtil.loadFileLines(failCallbackFileName);
		FileUtil.deleteFile(failCallbackFileName);

		// parse
		List<HandleCallbackParam> failCallbackParamList = new ArrayList<>();
		if (fileLines != null && fileLines.size() > 0) {
			for (String line : fileLines) {
				List<HandleCallbackParam> failCallbackParamListTmp = JacksonUtil.readValue(line, List.class,
						HandleCallbackParam.class);
				if (failCallbackParamListTmp != null && failCallbackParamListTmp.size() > 0) {
					failCallbackParamList.addAll(failCallbackParamListTmp);
				}
			}
		}

		// retry callback, 100 lines per page
		if (failCallbackParamList != null && failCallbackParamList.size() > 0) {
			int pagesize = 100;
			List<HandleCallbackParam> pageData = new ArrayList<>();
			for (int i = 0; i < failCallbackParamList.size(); i++) {
				pageData.add(failCallbackParamList.get(i));
				if (i > 0 && i % pagesize == 0) {
					doCallback(pageData);
					pageData.clear();
				}
			}
			if (pageData.size() > 0) {
				doCallback(pageData);
			}
		}
	}
Logo

瓜分20万奖金 获得内推名额 丰厚实物奖励 易参与易上手

更多推荐