企业级应用如何用 Apache DolphinScheduler 有针对性地进行告警插件开发?
点击蓝字 关注我们作者 | 刘宇星Apache DolphinScheduler的2.0.1版本加入了插件化架构改进,将任务、告警组件、数据源、资源存储、注册中心等都将被设计为扩展点,以此来提高 Apache DolphinScheduler 本身的灵活性和友好性。在企业级应用中不同公司的告警需求可能各有不同,针对性的告警插件开发可以很好地解决这一痛点。版本:3.1.2告警插件开发先来看下aler
点击蓝字 关注我们
作者 | 刘宇星
Apache DolphinScheduler的2.0.1版本加入了插件化架构改进,将任务、告警组件、数据源、资源存储、注册中心等都将被设计为扩展点,以此来提高 Apache DolphinScheduler 本身的灵活性和友好性。在企业级应用中不同公司的告警需求可能各有不同,针对性的告警插件开发可以很好地解决这一痛点。
版本:3.1.2
告警插件开发
先来看下alert目录的结构
dolphinscheduler-alert-api
该模块是 ALERT SPI 的核心模块,该模块定义了告警插件扩展的接口以及一些基础代码,其中 AlertChannel 和 AlertChannelFactory 是告警插件开发需要实现的接口类
dolphinscheduler-alert-plugins
该模块包含了官方提供的告警插件,目前我们已经支持数十种插件,如 Email、DingTalk、Script等
dolphinscheduler-alert-server
告警服务模块,主要功能包括注册告警插件,Netty告警消息发送等
本文以官方的http告警插件为例讲解如何进行插件开发
首先明确需求,http告警插件需要通过http发送请求,发送请求首先需要确定哪些参数.在
HttpAlertConstants
可以看到有定义一些相关参数
package org.apache.dolphinscheduler.plugin.alert.http;
public final class HttpAlertConstants {
public static final String URL = "$t('url')";
public static final String NAME_URL = "url";
public static final String HEADER_PARAMS = "$t('headerParams')";
public static final String NAME_HEADER_PARAMS = "headerParams";
...........................省略多余代码
private HttpAlertConstants() {
throw new UnsupportedOperationException("This is a utility class and cannot be instantiated");
}
}
对应此处告警实例需要填写的参数
其中 $t('url') 样式的参数可以通过编辑
dolphinscheduler-ui/src/locales/zh_CN/security.ts
添加对应的参数,前端收到后会自动替换,同样的英文字典也需要替换,不然切换英文时会报错
在
HttpAlertChannelFactory
需要实现AlertChannelFactory
并实现它的方法name
,params
和create
。其中InputParam.newBuilder
的第一个参数是显示的值,第二个参数是参数名,这里用我们前面在MailParamsConstants
写好的常量。所有参数写好后添加到paramsList
后返回
@AutoService(AlertChannelFactory.class)
public final class HttpAlertChannelFactory implements AlertChannelFactory {
@Override
public String name() {
return "Http";
}
@Override
public List<PluginParams> params() {
InputParam url = InputParam.newBuilder(HttpAlertConstants.NAME_URL, HttpAlertConstants.URL)
.setPlaceholder("input request URL")
.addValidate(Validate.newBuilder()
.setRequired(true)
.build())
.build();
InputParam headerParams = InputParam.newBuilder(HttpAlertConstants.NAME_HEADER_PARAMS, HttpAlertConstants.HEADER_PARAMS)
.setPlaceholder("input request headers as JSON format ")
.addValidate(Validate.newBuilder()
.setRequired(true)
.build())
.build();
InputParam bodyParams = InputParam.newBuilder(HttpAlertConstants.NAME_BODY_PARAMS, HttpAlertConstants.BODY_PARAMS)
.setPlaceholder("input request body as JSON format ")
.addValidate(Validate.newBuilder()
.setRequired(false)
.build())
.build();
...........................省略多余代码
return Arrays.asList(url, requestType, headerParams, bodyParams, contentField);
}
@Override
public AlertChannel create() {
return new HttpAlertChannel();
}
}
在
HttpAlertChannel
需要实现AlertChannel
并实现process
方法,其中alertInfo.getAlertData().getAlertParams()
可以拿到在创建告警实例时填写的参数,在此处编写相关代码发送请求后,需要返回AlertResult
对象用来标记请求发送or失败
public final class HttpAlertChannel implements AlertChannel {
@Override
public AlertResult process(AlertInfo alertInfo) {
AlertData alertData = alertInfo.getAlertData();
Map<String, String> paramsMap = alertInfo.getAlertParams();
if (null == paramsMap) {
return new AlertResult("false", "http params is null");
}
return new HttpSender(paramsMap).send(alertData.getContent());
}
}
至此插件开发就完成的,是不是很简单:)设计优秀架构合理的代码就应该是这样优雅高效解耦合. 完成以上开发后,启动告警服务,就可以在添加告警实例时选择对应的插件了。
源码解读
在启动告警服务时,可以在日志看到有注册告警插件的信息
以此为切入口来探索插件实现的相关代码
在dolphinscheduler-alert-server的
AlertPluginManager
的 installPlugin 方法可以看到注册告警插件的内容,这里先获取所有实现了AlertChannelFactory.class
的类,遍历后获取AlertChannel
的实例,添加到数据库和channelKeyedById
Map
private final Map<Integer, AlertChannel> channelKeyedById = new HashMap<>();
@EventListener
public void installPlugin(ApplicationReadyEvent readyEvent) {
PrioritySPIFactory<AlertChannelFactory> prioritySPIFactory = new PrioritySPIFactory<>(AlertChannelFactory.class);
for (Map.Entry<String, AlertChannelFactory> entry : prioritySPIFactory.getSPIMap().entrySet()) {
String name = entry.getKey();
AlertChannelFactory factory = entry.getValue();
logger.info("Registering alert plugin: {} - {}", name, factory.getClass());
final AlertChannel alertChannel = factory.create();
logger.info("Registered alert plugin: {} - {}", name, factory.getClass());
final List<PluginParams> params = new ArrayList<>(factory.params());
params.add(0, warningTypeParams);
final String paramsJson = PluginParamsTransfer.transferParamsToJson(params);
final PluginDefine pluginDefine = new PluginDefine(name, PluginType.ALERT.getDesc(), paramsJson);
final int id = pluginDao.addOrUpdatePluginDefine(pluginDefine);
channelKeyedById.put(id, alertChannel);
}
}
完成插件的开发和注册后,需要有个轮询线程来遍历查询需要发送的消息和完成发送的动作,在
AlertSenderService
的run
方法完成了这些
@Override
public void run() {
logger.info("alert sender started");
while (!ServerLifeCycleManager.isStopped()) {
try {
List<Alert> alerts = alertDao.listPendingAlerts();
AlertServerMetrics.registerPendingAlertGauge(alerts::size);
this.send(alerts);
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS * 5L);
} catch (Exception e) {
logger.error("alert sender thread error", e);
}
}
}
关键方法是
this.send(alerts)
,这里遍历Alert
后获取告警插件的实例集合,在this.alertResultHandler(instance, alertData)
传入插件实例对象和告警参数,最后更新这条告警消息的状态
public void send(List<Alert> alerts) {
for (Alert alert : alerts) {
// get alert group from alert
int alertId = Optional.ofNullable(alert.getId()).orElse(0);
int alertGroupId = Optional.ofNullable(alert.getAlertGroupId()).orElse(0);
List<AlertPluginInstance> alertInstanceList = alertDao.listInstanceByAlertGroupId(alertGroupId);
if (CollectionUtils.isEmpty(alertInstanceList)) {
logger.error("send alert msg fail,no bind plugin instance.");
List<AlertResult> alertResults = Lists.newArrayList(new AlertResult("false",
"no bind plugin instance"));
alertDao.updateAlert(AlertStatus.EXECUTION_FAILURE, JSONUtils.toJsonString(alertResults), alertId);
continue;
}
AlertData alertData = AlertData.builder()
.id(alertId)
.content(alert.getContent())
.log(alert.getLog())
.title(alert.getTitle())
.warnType(alert.getWarningType().getCode())
.alertType(alert.getAlertType().getCode())
.build();
int sendSuccessCount = 0;
List<AlertResult> alertResults = new ArrayList<>();
for (AlertPluginInstance instance : alertInstanceList) {
AlertResult alertResult = this.alertResultHandler(instance, alertData);
if (alertResult != null) {
AlertStatus sendStatus = Boolean.parseBoolean(String.valueOf(alertResult.getStatus()))
? AlertStatus.EXECUTION_SUCCESS
: AlertStatus.EXECUTION_FAILURE;
alertDao.addAlertSendStatus(sendStatus, JSONUtils.toJsonString(alertResult), alertId,
instance.getId());
if (sendStatus.equals(AlertStatus.EXECUTION_SUCCESS)) {
sendSuccessCount++;
AlertServerMetrics.incAlertSuccessCount();
} else {
AlertServerMetrics.incAlertFailCount();
}
alertResults.add(alertResult);
}
}
AlertStatus alertStatus = AlertStatus.EXECUTION_SUCCESS;
if (sendSuccessCount == 0) {
alertStatus = AlertStatus.EXECUTION_FAILURE;
} else if (sendSuccessCount < alertInstanceList.size()) {
alertStatus = AlertStatus.EXECUTION_PARTIAL_SUCCESS;
}
alertDao.updateAlert(alertStatus, JSONUtils.toJsonString(alertResults), alertId);
}
}
在
alertResultHandler
用alertPluginManager.getAlertChannel(instance.getPluginDefineId())
获取AlertChannel
实例.还记得前面注册告警插件时往channelKeyedById
里put的AlertChannel
实例的动作吗?
public Optional<AlertChannel> getAlertChannel(int id) {
return Optional.ofNullable(channelKeyedById.get(id));
}
然后构建
AlertInfo
对象,通过CompletableFuture.supplyAsync()
来异步回调执行alertChannel.process(alertInfo)
,用future.get()
获得回调执行返回的AlertResult
再return
private @Nullable AlertResult alertResultHandler(AlertPluginInstance instance, AlertData alertData) {
String pluginInstanceName = instance.getInstanceName();
int pluginDefineId = instance.getPluginDefineId();
Optional<AlertChannel> alertChannelOptional = alertPluginManager.getAlertChannel(instance.getPluginDefineId());
if (!alertChannelOptional.isPresent()) {
String message = String.format("Alert Plugin %s send error: the channel doesn't exist, pluginDefineId: %s",
pluginInstanceName,
pluginDefineId);
logger.error("Alert Plugin {} send error : not found plugin {}", pluginInstanceName, pluginDefineId);
return new AlertResult("false", message);
}
AlertChannel alertChannel = alertChannelOptional.get();
Map<String, String> paramsMap = JSONUtils.toMap(instance.getPluginInstanceParams());
String instanceWarnType = WarningType.ALL.getDescp();
if (paramsMap != null) {
instanceWarnType = paramsMap.getOrDefault(AlertConstants.NAME_WARNING_TYPE, WarningType.ALL.getDescp());
}
WarningType warningType = WarningType.of(instanceWarnType);
if (warningType == null) {
String message = String.format("Alert Plugin %s send error : plugin warnType is null", pluginInstanceName);
logger.error("Alert Plugin {} send error : plugin warnType is null", pluginInstanceName);
return new AlertResult("false", message);
}
boolean sendWarning = false;
switch (warningType) {
case ALL:
sendWarning = true;
break;
case SUCCESS:
if (alertData.getWarnType() == WarningType.SUCCESS.getCode()) {
sendWarning = true;
}
break;
case FAILURE:
if (alertData.getWarnType() == WarningType.FAILURE.getCode()) {
sendWarning = true;
}
break;
default:
}
if (!sendWarning) {
logger.info(
"Alert Plugin {} send ignore warning type not match: plugin warning type is {}, alert data warning type is {}",
pluginInstanceName, warningType.getCode(), alertData.getWarnType());
return null;
}
AlertInfo alertInfo = AlertInfo.builder()
.alertData(alertData)
.alertParams(paramsMap)
.alertPluginInstanceId(instance.getId())
.build();
int waitTimeout = alertConfig.getWaitTimeout();
try {
AlertResult alertResult;
if (waitTimeout <= 0) {
if (alertData.getAlertType() == AlertType.CLOSE_ALERT.getCode()) {
alertResult = alertChannel.closeAlert(alertInfo);
} else {
alertResult = alertChannel.process(alertInfo);
}
} else {
CompletableFuture<AlertResult> future;
if (alertData.getAlertType() == AlertType.CLOSE_ALERT.getCode()) {
future = CompletableFuture.supplyAsync(() -> alertChannel.closeAlert(alertInfo));
} else {
future = CompletableFuture.supplyAsync(() -> alertChannel.process(alertInfo));
}
alertResult = future.get(waitTimeout, TimeUnit.MILLISECONDS);
}
if (alertResult == null) {
throw new RuntimeException("Alert result cannot be null");
}
return alertResult;
} catch (InterruptedException e) {
logger.error("send alert error alert data id :{},", alertData.getId(), e);
Thread.currentThread().interrupt();
return new AlertResult("false", e.getMessage());
} catch (Exception e) {
logger.error("send alert error alert data id :{},", alertData.getId(), e);
return new AlertResult("false", e.getMessage());
}
}
综上描述,可以画出注册插件和发送消息的时序图
以上就是告警插件的主要实现代码,是不是发现源码看下来也没有发现多高深和复杂:)所以多看看源码吧,以后你也可以写出这样优秀的开源软件来贡献开源
参考链接:
[Feature] Alert Plugin Design · Issue #3049 · apache/dolphinscheduler (https://github.com/apache/dolphinscheduler/issues/3049)
alert (https://dolphinscheduler.apache.org/zh-cn/docs/latest/user_doc/contribute/backend/spi/alert.html)
参与贡献
随着国内开源的迅猛崛起,Apache DolphinScheduler 社区迎来蓬勃发展,为了做更好用、易用的调度,真诚欢迎热爱开源的伙伴加入到开源社区中来,为中国开源崛起献上一份自己的力量,让本土开源走向全球。
参与 DolphinScheduler 社区有非常多的参与贡献的方式,包括:
贡献第一个PR(文档、代码) 我们也希望是简单的,第一个PR用于熟悉提交的流程和社区协作以及感受社区的友好度。
社区汇总了以下适合新手的问题列表(Good First Issue):https://github.com/apache/dolphinscheduler/contribute
非新手问题列表:https://github.com/apache/dolphinscheduler/issues?q=is%3Aopen+is%3Aissue+label%3A%22help+wanted%22+q=is%3Aopen+is%3Aissue+label%3A%22volunteer+wanted%22
如何参与贡献链接:https://github.com/apache/dolphinscheduler/blob/8944fdc62295883b0fa46b137ba8aee4fde9711a/docs/docs/en/contribute/join/contribute.md
来吧,DolphinScheduler开源社区需要您的参与,为中国开源崛起添砖加瓦吧,哪怕只是小小的一块瓦,汇聚起来的力量也是巨大的。
参与开源可以近距离与各路高手切磋,迅速提升自己的技能,如果您想参与贡献,我们有个贡献者种子孵化群,可以添加社区小助手微信(Leonard-ds) ,手把手教会您( 贡献者不分水平高低,有问必答,关键是有一颗愿意贡献的心 )。
添加社区小助手微信(Leonard-ds)
添加小助手微信时请说明想参与贡献。
来吧,开源社区非常期待您的参与。
< 🐬🐬 >
精彩活动推荐
汽车行业走在了数字化革命浪潮的前列。大数据和 AI 技术的日益成熟,让汽车行业面对着动辄上百万的日活数据,二调度系统助力汽车数字化平台数据调度重塑着未来汽车的面貌,其重要作用不言而喻。
Apache DolphinScheduler 作为国内外多家知名车企数据平台的核心调度系统,它是如何帮助车企迎接数字化时代新挑战的?如何辅助重塑未来汽车的新面貌?欢迎大家关注即将到来的 Apache DolphinScheduler 汽车行业最佳应用实践专场直播!
直播时间:倒计时 1 小时!2023 年 5 月 23 日 19:00-21:00
预约方式:点击预约,视频号直播不见不散!
点击阅读原文,点亮Star支持我们哟
更多推荐
所有评论(0)