点击蓝字 关注我们

a22c9d8bdde9db52666e2d6a33b42d4e.jpeg

作者 | 刘宇星

Apache DolphinScheduler的2.0.1版本加入了插件化架构改进,将任务、告警组件、数据源、资源存储、注册中心等都将被设计为扩展点,以此来提高 Apache DolphinScheduler 本身的灵活性和友好性。在企业级应用中不同公司的告警需求可能各有不同,针对性的告警插件开发可以很好地解决这一痛点。

版本:3.1.2

告警插件开发

先来看下alert目录的结构

91bdbf1e7c547ca06fcebb4a5855ba4f.png

  • dolphinscheduler-alert-api

  • 该模块是 ALERT SPI 的核心模块,该模块定义了告警插件扩展的接口以及一些基础代码,其中 AlertChannel 和 AlertChannelFactory 是告警插件开发需要实现的接口类

  • dolphinscheduler-alert-plugins

  • 该模块包含了官方提供的告警插件,目前我们已经支持数十种插件,如 Email、DingTalk、Script等

  • dolphinscheduler-alert-server

  • 告警服务模块,主要功能包括注册告警插件,Netty告警消息发送等

本文以官方的http告警插件为例讲解如何进行插件开发

  • 首先明确需求,http告警插件需要通过http发送请求,发送请求首先需要确定哪些参数.在 HttpAlertConstants 可以看到有定义一些相关参数

package org.apache.dolphinscheduler.plugin.alert.http;
public final class HttpAlertConstants {
    public static final String URL = "$t('url')";

    public static final String NAME_URL = "url";

    public static final String HEADER_PARAMS = "$t('headerParams')";

    public static final String NAME_HEADER_PARAMS = "headerParams";

...........................省略多余代码

    private HttpAlertConstants() {
        throw new UnsupportedOperationException("This is a utility class and cannot be instantiated");
    }
}
  • 对应此处告警实例需要填写的参数

529872f87e0c81969cb4efa2f33ba495.png

其中 $t('url') 样式的参数可以通过编辑

dolphinscheduler-ui/src/locales/zh_CN/security.ts

添加对应的参数,前端收到后会自动替换,同样的英文字典也需要替换,不然切换英文时会报错

  • HttpAlertChannelFactory需要实现AlertChannelFactory并实现它的方法name,paramscreate。其中InputParam.newBuilder的第一个参数是显示的值,第二个参数是参数名,这里用我们前面在MailParamsConstants写好的常量。所有参数写好后添加到paramsList后返回

@AutoService(AlertChannelFactory.class)
public final class HttpAlertChannelFactory implements AlertChannelFactory {
    @Override
    public String name() {
        return "Http";
    }
    @Override
    public List<PluginParams> params() {
        InputParam url = InputParam.newBuilder(HttpAlertConstants.NAME_URL, HttpAlertConstants.URL)
                                   .setPlaceholder("input request URL")
                                   .addValidate(Validate.newBuilder()
                                                        .setRequired(true)
                                                        .build())
                                   .build();
        InputParam headerParams = InputParam.newBuilder(HttpAlertConstants.NAME_HEADER_PARAMS, HttpAlertConstants.HEADER_PARAMS)
                                            .setPlaceholder("input request headers as JSON format ")
                                            .addValidate(Validate.newBuilder()
                                                                 .setRequired(true)
                                                                 .build())
                                            .build();
        InputParam bodyParams = InputParam.newBuilder(HttpAlertConstants.NAME_BODY_PARAMS, HttpAlertConstants.BODY_PARAMS)
                                          .setPlaceholder("input request body as JSON format ")
                                          .addValidate(Validate.newBuilder()
                                                               .setRequired(false)
                                                               .build())
                                          .build();
...........................省略多余代码
        return Arrays.asList(url, requestType, headerParams, bodyParams, contentField);
    }
    @Override
    public AlertChannel create() {
        return new HttpAlertChannel();
    }
}
  • HttpAlertChannel需要实现AlertChannel并实现process方法,其中alertInfo.getAlertData().getAlertParams()可以拿到在创建告警实例时填写的参数,在此处编写相关代码发送请求后,需要返回AlertResult对象用来标记请求发送or失败

public final class HttpAlertChannel implements AlertChannel {
    @Override
    public AlertResult process(AlertInfo alertInfo) {
        AlertData alertData = alertInfo.getAlertData();
        Map<String, String> paramsMap = alertInfo.getAlertParams();
        if (null == paramsMap) {
            return new AlertResult("false", "http params is null");
        }
        return new HttpSender(paramsMap).send(alertData.getContent());
    }
}

至此插件开发就完成的,是不是很简单:)设计优秀架构合理的代码就应该是这样优雅高效解耦合. 完成以上开发后,启动告警服务,就可以在添加告警实例时选择对应的插件了。

2f1b6a6ef8808425ef82b862a67b713c.png

源码解读

在启动告警服务时,可以在日志看到有注册告警插件的信息

5db3332a791ae62a815448b3c89f757f.png

以此为切入口来探索插件实现的相关代码

  • 在dolphinscheduler-alert-server的AlertPluginManager的 installPlugin 方法可以看到注册告警插件的内容,这里先获取所有实现了AlertChannelFactory.class的类,遍历后获取AlertChannel的实例,添加到数据库和channelKeyedByIdMap

private final Map<Integer, AlertChannel> channelKeyedById = new HashMap<>();
    
    @EventListener
    public void installPlugin(ApplicationReadyEvent readyEvent) {
        PrioritySPIFactory<AlertChannelFactory> prioritySPIFactory = new PrioritySPIFactory<>(AlertChannelFactory.class);
        for (Map.Entry<String, AlertChannelFactory> entry : prioritySPIFactory.getSPIMap().entrySet()) {
            String name = entry.getKey();
            AlertChannelFactory factory = entry.getValue();
            logger.info("Registering alert plugin: {} - {}", name, factory.getClass());
            final AlertChannel alertChannel = factory.create();
            logger.info("Registered alert plugin: {} - {}", name, factory.getClass());
            final List<PluginParams> params = new ArrayList<>(factory.params());
            params.add(0, warningTypeParams);
            final String paramsJson = PluginParamsTransfer.transferParamsToJson(params);
            final PluginDefine pluginDefine = new PluginDefine(name, PluginType.ALERT.getDesc(), paramsJson);
            final int id = pluginDao.addOrUpdatePluginDefine(pluginDefine);
            channelKeyedById.put(id, alertChannel);
        }
    }
  • 完成插件的开发和注册后,需要有个轮询线程来遍历查询需要发送的消息和完成发送的动作,在AlertSenderServicerun方法完成了这些

@Override
public void run() {
    logger.info("alert sender started");
    while (!ServerLifeCycleManager.isStopped()) {
        try {
            List<Alert> alerts = alertDao.listPendingAlerts();
            AlertServerMetrics.registerPendingAlertGauge(alerts::size);
            this.send(alerts);
            ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS * 5L);
        } catch (Exception e) {
            logger.error("alert sender thread error", e);
        }
    }
}
  • 关键方法是this.send(alerts),这里遍历Alert后获取告警插件的实例集合,在 this.alertResultHandler(instance, alertData)传入插件实例对象和告警参数,最后更新这条告警消息的状态

public void send(List<Alert> alerts) {
    for (Alert alert : alerts) {
        // get alert group from alert
        int alertId = Optional.ofNullable(alert.getId()).orElse(0);
        int alertGroupId = Optional.ofNullable(alert.getAlertGroupId()).orElse(0);
        List<AlertPluginInstance> alertInstanceList = alertDao.listInstanceByAlertGroupId(alertGroupId);
        if (CollectionUtils.isEmpty(alertInstanceList)) {
            logger.error("send alert msg fail,no bind plugin instance.");
            List<AlertResult> alertResults = Lists.newArrayList(new AlertResult("false",
                    "no bind plugin instance"));
            alertDao.updateAlert(AlertStatus.EXECUTION_FAILURE, JSONUtils.toJsonString(alertResults), alertId);
            continue;
        }
        AlertData alertData = AlertData.builder()
                .id(alertId)
                .content(alert.getContent())
                .log(alert.getLog())
                .title(alert.getTitle())
                .warnType(alert.getWarningType().getCode())
                .alertType(alert.getAlertType().getCode())
                .build();

        int sendSuccessCount = 0;
        List<AlertResult> alertResults = new ArrayList<>();
        for (AlertPluginInstance instance : alertInstanceList) {
            AlertResult alertResult = this.alertResultHandler(instance, alertData);
            if (alertResult != null) {
                AlertStatus sendStatus = Boolean.parseBoolean(String.valueOf(alertResult.getStatus()))
                        ? AlertStatus.EXECUTION_SUCCESS
                        : AlertStatus.EXECUTION_FAILURE;
                alertDao.addAlertSendStatus(sendStatus, JSONUtils.toJsonString(alertResult), alertId,
                        instance.getId());
                if (sendStatus.equals(AlertStatus.EXECUTION_SUCCESS)) {
                    sendSuccessCount++;
                    AlertServerMetrics.incAlertSuccessCount();
                } else {
                    AlertServerMetrics.incAlertFailCount();
                }
                alertResults.add(alertResult);
            }
        }
        AlertStatus alertStatus = AlertStatus.EXECUTION_SUCCESS;
        if (sendSuccessCount == 0) {
            alertStatus = AlertStatus.EXECUTION_FAILURE;
        } else if (sendSuccessCount < alertInstanceList.size()) {
            alertStatus = AlertStatus.EXECUTION_PARTIAL_SUCCESS;
        }
        alertDao.updateAlert(alertStatus, JSONUtils.toJsonString(alertResults), alertId);
    }
}
  • alertResultHandleralertPluginManager.getAlertChannel(instance.getPluginDefineId())获取AlertChannel实例.还记得前面注册告警插件时往channelKeyedById里put的AlertChannel实例的动作吗?

public Optional<AlertChannel> getAlertChannel(int id) {
    return Optional.ofNullable(channelKeyedById.get(id));
}
  • 然后构建AlertInfo对象,通过CompletableFuture.supplyAsync()来异步回调执行alertChannel.process(alertInfo),用future.get()获得回调执行返回的AlertResult再return

private @Nullable AlertResult alertResultHandler(AlertPluginInstance instance, AlertData alertData) {
    String pluginInstanceName = instance.getInstanceName();
    int pluginDefineId = instance.getPluginDefineId();
    Optional<AlertChannel> alertChannelOptional = alertPluginManager.getAlertChannel(instance.getPluginDefineId());
    if (!alertChannelOptional.isPresent()) {
        String message = String.format("Alert Plugin %s send error: the channel doesn't exist, pluginDefineId: %s",
                pluginInstanceName,
                pluginDefineId);
        logger.error("Alert Plugin {} send error : not found plugin {}", pluginInstanceName, pluginDefineId);
        return new AlertResult("false", message);
    }
    AlertChannel alertChannel = alertChannelOptional.get();

    Map<String, String> paramsMap = JSONUtils.toMap(instance.getPluginInstanceParams());
    String instanceWarnType = WarningType.ALL.getDescp();

    if (paramsMap != null) {
        instanceWarnType = paramsMap.getOrDefault(AlertConstants.NAME_WARNING_TYPE, WarningType.ALL.getDescp());
    }

    WarningType warningType = WarningType.of(instanceWarnType);

    if (warningType == null) {
        String message = String.format("Alert Plugin %s send error : plugin warnType is null", pluginInstanceName);
        logger.error("Alert Plugin {} send error : plugin warnType is null", pluginInstanceName);
        return new AlertResult("false", message);
    }

    boolean sendWarning = false;
    switch (warningType) {
        case ALL:
            sendWarning = true;
            break;
        case SUCCESS:
            if (alertData.getWarnType() == WarningType.SUCCESS.getCode()) {
                sendWarning = true;
            }
            break;
        case FAILURE:
            if (alertData.getWarnType() == WarningType.FAILURE.getCode()) {
                sendWarning = true;
            }
            break;
        default:
    }

    if (!sendWarning) {
        logger.info(
                "Alert Plugin {} send ignore warning type not match: plugin warning type is {}, alert data warning type is {}",
                pluginInstanceName, warningType.getCode(), alertData.getWarnType());
        return null;
    }

    AlertInfo alertInfo = AlertInfo.builder()
            .alertData(alertData)
            .alertParams(paramsMap)
            .alertPluginInstanceId(instance.getId())
            .build();
    int waitTimeout = alertConfig.getWaitTimeout();
    try {
        AlertResult alertResult;
        if (waitTimeout <= 0) {
            if (alertData.getAlertType() == AlertType.CLOSE_ALERT.getCode()) {
                alertResult = alertChannel.closeAlert(alertInfo);
            } else {
                alertResult = alertChannel.process(alertInfo);
            }
        } else {
            CompletableFuture<AlertResult> future;
            if (alertData.getAlertType() == AlertType.CLOSE_ALERT.getCode()) {
                future = CompletableFuture.supplyAsync(() -> alertChannel.closeAlert(alertInfo));
            } else {
                future = CompletableFuture.supplyAsync(() -> alertChannel.process(alertInfo));
            }
            alertResult = future.get(waitTimeout, TimeUnit.MILLISECONDS);
        }
        if (alertResult == null) {
            throw new RuntimeException("Alert result cannot be null");
        }
        return alertResult;
    } catch (InterruptedException e) {
        logger.error("send alert error alert data id :{},", alertData.getId(), e);
        Thread.currentThread().interrupt();
        return new AlertResult("false", e.getMessage());
    } catch (Exception e) {
        logger.error("send alert error alert data id :{},", alertData.getId(), e);
        return new AlertResult("false", e.getMessage());
    }
}

综上描述,可以画出注册插件和发送消息的时序图

f9607f5d5a3a277922c46c93f7c0c601.png

以上就是告警插件的主要实现代码,是不是发现源码看下来也没有发现多高深和复杂:)所以多看看源码吧,以后你也可以写出这样优秀的开源软件来贡献开源

参考链接:

[Feature] Alert Plugin Design · Issue #3049 · apache/dolphinscheduler (https://github.com/apache/dolphinscheduler/issues/3049)

alert (https://dolphinscheduler.apache.org/zh-cn/docs/latest/user_doc/contribute/backend/spi/alert.html)

参与贡献

随着国内开源的迅猛崛起,Apache DolphinScheduler 社区迎来蓬勃发展,为了做更好用、易用的调度,真诚欢迎热爱开源的伙伴加入到开源社区中来,为中国开源崛起献上一份自己的力量,让本土开源走向全球。

3e34e810c4da8aee5a1b32b930cbca4f.png

参与 DolphinScheduler 社区有非常多的参与贡献的方式,包括:

87fd62673f5f36b684f94877bcf2f43f.png

贡献第一个PR(文档、代码) 我们也希望是简单的,第一个PR用于熟悉提交的流程和社区协作以及感受社区的友好度。

社区汇总了以下适合新手的问题列表(Good First Issue):https://github.com/apache/dolphinscheduler/contribute

非新手问题列表:https://github.com/apache/dolphinscheduler/issues?q=is%3Aopen+is%3Aissue+label%3A%22help+wanted%22+q=is%3Aopen+is%3Aissue+label%3A%22volunteer+wanted%22

如何参与贡献链接:https://github.com/apache/dolphinscheduler/blob/8944fdc62295883b0fa46b137ba8aee4fde9711a/docs/docs/en/contribute/join/contribute.md

来吧,DolphinScheduler开源社区需要您的参与,为中国开源崛起添砖加瓦吧,哪怕只是小小的一块瓦,汇聚起来的力量也是巨大的。

参与开源可以近距离与各路高手切磋,迅速提升自己的技能,如果您想参与贡献,我们有个贡献者种子孵化群,可以添加社区小助手微信(Leonard-ds) ,手把手教会您( 贡献者不分水平高低,有问必答,关键是有一颗愿意贡献的心 )。

ae3125f751462ed21d627b47e64079a0.jpeg

添加社区小助手微信(Leonard-ds) 

添加小助手微信时请说明想参与贡献。

来吧,开源社区非常期待您的参与。

< 🐬🐬 >

精彩活动推荐

汽车行业走在了数字化革命浪潮的前列。大数据和 AI 技术的日益成熟,让汽车行业面对着动辄上百万的日活数据,二调度系统助力汽车数字化平台数据调度重塑着未来汽车的面貌,其重要作用不言而喻。

Apache DolphinScheduler 作为国内外多家知名车企数据平台的核心调度系统,它是如何帮助车企迎接数字化时代新挑战的?如何辅助重塑未来汽车的新面貌?欢迎大家关注即将到来的 Apache DolphinScheduler 汽车行业最佳应用实践专场直播

直播时间:倒计时 1 小时!2023 年 5 月 23 日 19:00-21:00

预约方式:点击预约,视频号直播不见不散!

点击阅读原文,点亮Star支持我们哟

Logo

大数据从业者之家,一起探索大数据的无限可能!

更多推荐