基于spring schedule、zookeeper和VUE实现的可视化分布式定时任务工具
自己用Java写了个可视化的定时任务工具,取名sundial(日晷)。定时任务的原理就是spring schedule;分布式锁基于zookeeper实现,客户端采用Netflix开源的Curator。JDK用的17,最新的长期支持版本(LTS),数据库是MySQL,同样是最新的MySQL8。囿于篇幅,下面只介绍主要的类和数据库设计。查看详细代码,请移步GitHub,GitHub - rivery
石英(Quartz)晶体在电流作用下,会产生极规律的振动,科学家利用其这种特性发明了石英钟表。相较于结构复杂、造价高昂、保养麻烦的机械钟表,后者简单可靠,经济耐用,有种大音希声、大象无形的朴素美。Java生态中,也有自己的石英表,即Quartz,我也用Java写了个可视化的定时任务工具GitHub - riveryue/sundialContribute to riveryue/sundial development by creating an account on GitHub.https://github.com/riveryue/sundial。Quartz珠玉在前,sundial瓦石难当,但自己写个简单的,也能加深对架构的理解。说回正题,sundial定时任务的原理就是spring schedule;分布式锁基于zookeeper实现,客户端采用Netflix开源的Curator。JDK用的21,最新的长期支持版本(LTS),数据库是MySQL。
所谓分布式锁,即独立于整个分布式环境之外的全局且唯一的锁的添加、释放的机制。简单的分布式锁可由数据库实现,比如MySQL。但其性能显然不如基于内存的redis、zookeeper。用redis也可以做,redis是基于内存且支持持久化的键值对数据库。redis加锁本质上就是调用其set命令来对同一key设置键值对,value的话可以用当前线程的线程id,在解锁时对value做校验以避免释放其他线程的锁,再给键值对设置一个过期时间以实现锁超时功能。但有可能到了过期时间,持有锁的线程还没执行完成,这时锁已被释放,其他线程获取了锁,再对同一共享资源进行操作,就会出现bug;若是持有锁的redis master节点还没把锁信息同步给其他slave节点就宕机或因网络抖动而与client断开,而新选举出来的master节点如果收到某client对同一锁的加锁请求,就会加锁成功,便会导致同一锁被两个线程同时持有的问题,而线程安全问题相关的bug难以排查。可以用Redisson搭配Lua脚本来实现锁超时功能,基于其watchdog看门狗机制,每隔一定时间,默认为30秒。如果某客户端持有锁超过30秒,watchdog就会每隔10秒再把key的过期时间再设为30秒。这样,某线程执行的慢,一直持有锁,其他线程也不会获取到当前线程持有的锁。
而zookeeper实现分布式锁,就是利用其临时有序节点特性。在某一指定目录下创建节点,并判断节点序号是否为当前目录下最小,若是,则视为创建锁成功。否则,就对前一个节点添加一个监听事件。如果锁释放,会通知后一个节点,后一个节点再判断自己序号是否最小,最小就获得锁。这里的原理类似JUC里的AQS队列同步器的公平锁模式,AQS里有个双向链表,持有锁的线程在头节点,其他的等着抢锁的线程就在连在后面,持有锁的线程释放锁后会通知后一个节点的线程来竞争锁。
囿于篇幅,下面只介绍主要的类和数据库设计。查看详细代码,请移步GitHub,GitHub - riveryue/sundial。
使用时需要在任务类上加上自定义注解SundialTask,定时任务不能重复,所以注解的name属性在整个工程需保持唯一,否则工程启动时会报错,提示有名称重复的定时任务。
以下为SundialTask源码:
package sundial.annotation;
import java.lang.annotation.*;
/**
* @author yao
*/
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
public @interface SundialTask {
String name();
}
下图为示例,线程休眠5秒是为了模拟真实业务系统中的定时任务,用以测试分布式环境下同一任务是否只会执行一次。如果不休眠一段时间,直接在里面打个标记执行成功的日志,因为执行的太快,再加上分布式环境下各个机器的本地时间可能并不一致,存在时间差,而这时间差又大于job的执行耗时,线程间就可能不会存在锁竞争,导致每台服务器上都会执行一遍定时任务。
package sundial;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import sundial.annotation.SundialTask;
import java.util.Date;
/**
* @author yao
*/
@Component
@Slf4j
public class Task2 implements SundialExecute {
@SundialTask(name = "test2")
@Override
public void execute() {
try {
Thread.sleep(5000L);
} catch (InterruptedException e) {
log.error("error in schedule ", e);
}
log.info("task2 {}", new Date());
log.info("schedule execute successfully");
}
}
任务类实现了SundialExecute接口,它扩展了Runnable接口,用于定义定时任务的执行逻辑。
在SundialExecute接口中,定义了一个execute方法,这个方法是具体的任务执行逻辑,需要由实现SundialExecute接口的类来实现。
此外,SundialExecute接口还重写了Runnable接口的run方法。在这个方法中,首先通过Spring的工具类获取TaskConfService的bean(TaskConfService定义了定时任务的crud等方法),然后遍历当前类(实现了SundialExecute接口的类)的所有方法,找出带有SundialTask注解的方法,并获取该注解的name属性值。然后,使用这个name值从TaskConfService服务中查询对应的任务配置信息。如果任务配置信息存在,并且任务状态不是禁用状态,那么就尝试获取分布式锁,并执行execute方法。
如果获取分布式锁成功,那么就执行execute方法,并打印日志信息。如果获取分布式锁失败,那么就跳过任务执行,并打印警告日志。在执行execute方法的过程中,如果发生任何异常,都会打印错误日志。无论任务执行是否成功,最后都会尝试释放分布式锁,并打印相应的日志信息。
总的来说,SundialExecute接口定义了定时任务的执行逻辑,并提供了任务执行状态的检查,以及分布式锁的获取和释放等功能。
package sundial;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import sundial.annotation.SundialTask;
import sundial.config.CuratorFrameworkConfig;
import sundial.constant.TaskStatus;
import sundial.dto.TaskConfDTO;
import sundial.service.TaskConfService;
import sundial.utils.SpringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.commons.lang3.StringUtils;
import java.lang.reflect.Method;
import java.util.concurrent.TimeUnit;
/**
* @author yao
*/
public interface SundialExecute extends Runnable {
Logger logger = LoggerFactory.getLogger(SundialExecute.class);
void execute();
/**
* valid status of schedule job if it's available
*/
@Override
default void run() {
TaskConfService taskConfService = SpringUtils.getBean(TaskConfService.class);
Method[] declaredMethods = this.getClass().getDeclaredMethods();
String annoVal = StringUtils.EMPTY;
for (Method declaredMethod : declaredMethods) {
boolean annotationPresent = declaredMethod.isAnnotationPresent(SundialTask.class);
if (annotationPresent) {
SundialTask methodAnno = declaredMethod.getAnnotation(SundialTask.class);
annoVal = methodAnno.name();
}
}
TaskConfDTO taskConfDTO = taskConfService.queryByTaskName(annoVal);
if (taskConfDTO != null && taskConfDTO.getStatus().equals(TaskStatus.DISABLE)) {
return;
}
String zkLockPath = "/distributeLock";
CuratorFramework client = SpringUtils.getBean(CuratorFrameworkConfig.class).curatorFramework();
final InterProcessMutex mutex = new InterProcessMutex(client, zkLockPath);
try {
boolean flag = mutex.acquire(1, TimeUnit.SECONDS);
if (flag) {
execute();
logger.info("Acquired lock successfully, executing task...");
} else {
logger.warn("Failed to acquire lock, task execution skipped.");
}
} catch (Exception e) {
logger.error("An error occurred while trying to acquire lock or execute task", e);
} finally {
try {
mutex.release();
logger.info("Lock released successfully.");
} catch (Exception e) {
logger.error("An error occurred while trying to release lock", e);
}
}
}
}
底层还有个关于spring的配置类ScheduledConfig,它实现了ApplicationContextAware,SmartInitializingSingleton和SchedulingConfigurer接口,主要负责定时任务的配置和管理。
主要功能如下:
1. 在Spring容器初始化完成后,通过afterSingletonsInstantiated方法,扫描所有SundialExecute类型的Bean,找出其中带有SundialTask注解的方法,通过反射机制解析这些方法,并将这些方法注册为定时任务。
2. 在configureTasks方法中,为每个注册的定时任务创建一个ScheduledFuture,并将其保存在scheduledFutures这个map中。这样可以在后续需要时取消或重新这些任务。
3. restartJob方法,用于重新启动一个定时任务。这个方法会先取消当前的任务,然后创建一个新的任务。这样的话,修改定时任务(比如修改其状态,即启用和禁用;修改其执行时间间隔,即cron表达式)后就能即时生效。
总之,利用Spring的依赖注入和AOP功能,使得定时任务的管理更加灵活和方便;通过注解的方式定义定时任务,使得代码更加简洁,易于理解。定时任务类就只需实现SundialExecute接口,并在其execute方法(该方法须用SundialTask注解修饰)中编写业务逻辑。这样的话,对业务侵入较小,开发人员可以专注于功能实现,不必在各种配置地狱泥足深陷。
package sundial.config;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.MethodIntrospector;
import org.springframework.core.annotation.AnnotatedElementUtils;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.annotation.SchedulingConfigurer;
import org.springframework.scheduling.config.ScheduledTaskRegistrar;
import org.springframework.scheduling.support.CronTrigger;
import sundial.SundialExecute;
import sundial.TaskPool;
import sundial.annotation.SundialTask;
import sundial.dto.TaskConfDTO;
import sundial.service.TaskConfService;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.ScheduledFuture;
/**
* @author yao
*/
@Slf4j
@Configuration
public class ScheduledConfig implements ApplicationContextAware, SmartInitializingSingleton, SchedulingConfigurer {
private static ApplicationContext applicationContext;
private static final HashSet<String> TASK_NAME_LIST = new HashSet<>();
@Autowired
private TaskPool taskPool;
@Autowired
private TaskConfService taskConfService;
private TaskScheduler taskScheduler;
private static HashMap<SundialExecute, String> methodHashMap = Maps.newHashMap();
private Map<String, ScheduledFuture<?>> scheduledFutures = new HashMap<>();
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
ScheduledConfig.applicationContext = applicationContext;
}
@Override
public void afterSingletonsInstantiated() {
if (applicationContext == null) {
return;
}
String[] beanDefinitionNames = applicationContext.getBeanNamesForType(SundialExecute.class, false, true);
for (String beanDefinitionName : beanDefinitionNames) {
Object bean = applicationContext.getBean(beanDefinitionName);
Map<Method, SundialTask> annotatedMethods = null;
try {
annotatedMethods = MethodIntrospector.selectMethods(bean.getClass(),
(MethodIntrospector.MetadataLookup<SundialTask>) method -> AnnotatedElementUtils.findMergedAnnotation(method, SundialTask.class));
} catch (Throwable ex) {
log.error("task resolve error for bean[" + beanDefinitionName + "].", ex);
}
if (annotatedMethods == null || annotatedMethods.isEmpty()) {
continue;
}
for (Map.Entry<Method, SundialTask> taskEntry : annotatedMethods.entrySet()) {
Method executeMethod = taskEntry.getKey();
SundialTask task = taskEntry.getValue();
registerJobHandler(task, bean, executeMethod);
methodHashMap.put((SundialExecute) bean, task.name());
}
}
}
protected void registerJobHandler(SundialTask sundialTask, Object bean, Method executeMethod) {
if (sundialTask == null) {
return;
}
String taskName = sundialTask.name();
Class<?> clazz = bean.getClass();
String methodName = executeMethod.getName();
if (taskName.trim().isEmpty()) {
throw new RuntimeException("task name invalid, for[" + clazz + "#" + methodName + "] .");
}
if (!TASK_NAME_LIST.add(taskName)) {
throw new RuntimeException("task can't be duplicate in whole project, [" + taskName + "] already exist.");
}
SundialExecute job = null;
try {
job = (SundialExecute) clazz.getConstructor().newInstance();
} catch (InstantiationException e) {
log.error("occur error in create object via reflect ", e);
//todo create custom exception
throw new RuntimeException("occur error in create object via reflect");
} catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
log.error("occur error in call method via reflect ", e);
}
taskPool.put(taskName, job);
}
@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
this.taskScheduler = taskRegistrar.getScheduler();
for (Map.Entry<SundialExecute, String> entry : methodHashMap.entrySet()) {
ScheduledFuture<?> future = taskScheduler.schedule(entry.getKey(), triggerContext -> {
TaskConfDTO taskConfDTO = taskConfService.queryByTaskName(entry.getValue());
return new CronTrigger(taskConfDTO.getCron()).nextExecutionTime(triggerContext);
});
scheduledFutures.put(entry.getValue(), future);
}
}
public void restartJob(SundialExecute job, TaskConfDTO taskConfDTO) {
ScheduledFuture<?> future = scheduledFutures.get(taskConfDTO.getTaskName());
if (future != null) {
future.cancel(true);
}
ScheduledFuture<?> newFuture = taskScheduler.schedule(job, triggerContext -> new CronTrigger(taskConfDTO.getCron()).nextExecutionTime(triggerContext));
scheduledFutures.put(taskConfDTO.getTaskName(), newFuture);
}
}
这是数据库表设计:
CREATE database if NOT EXISTS `sundial` default character set utf8mb4 collate utf8mb4_unicode_ci;
use `sundial`;
SET FOREIGN_KEY_CHECKS=0;
-- ----------------------------
-- Table structure for task_conf
-- ----------------------------
DROP TABLE IF EXISTS `task_conf`;
CREATE TABLE `task_conf` (
`id` int NOT NULL AUTO_INCREMENT,
`task_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL,
`status` tinyint DEFAULT NULL COMMENT '1 available, 2 unavailable',
`cron` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `uni_task_name` (`task_name`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
-- ----------------------------
-- Records of task_conf
-- ----------------------------
INSERT INTO `task_conf` VALUES ('1', 'test1', '1', '*/4 * * * * ?');
INSERT INTO `task_conf` VALUES ('2', 'test2', '1', '*/9 * * * * ?');
前端页面,以后再改改样式。
我的MySQL和zookeeper装在本地,然后虚拟机就装个jdk,MySQL客户端,数据库安装在本地物理机,所以虚拟机里没必要装完整的MySQL服务。虚拟机里起三个服务以模拟分布式环境。
启动zookeeper,
虚拟机里启动定时任务服务,只贴了一台机器的截图,
在页面上启用定时任务2,
虚拟机里可以看到只有一台机器打出了任务2的日志,
至此,定时任务只在集群中的一台机器上执行,且执行成功。
更多推荐
所有评论(0)