深入理解监控系统——CAT Server端源码解析(初始化启动)
CAT服务端初始化Servlet容器加载、启动CAT目前是使用war包放入Servlet容器(如:tomcat或者jetty,以下假设使用tomcat容器)中的方式部署启动。熟悉servlet容器的同学应该知道,容器启动时会读取每个Context(可理解为web工程)中的web.xml然后启动Servlet等其他组件。在cat-home模块中的web.xml中可以看到,除了容器默认的Servlet
相关文章:
[分布式监控CAT] Client端源码解析
[分布式监控CAT] Server端源码解析——消息消费\报表处理
前言
本文主要讲解CAT-Server(包括Cat-home\Cat-consumer等模块),从Cat-Server端的概述到具体初始化源码分析,后续文章会继续从源码入手进行分析。
Server端
(Cat-consumer 用于实时分析从客户端提供的数据\Cat-home 作为用户给用户提供展示的控制端
,并且Cat-home做展示时,通过对Cat-Consumer的调用获取其他节点的数据,将所有数据汇总展示)
consumer、home以及路由中心都是部署在一起的,每个服务端节点都可以充当任何一个角色
**Client端 **
(Cat-client 提供给业务以及中间层埋点的底层SDK)
相关文章:
[分布式监控CAT] Server端源码解析——初始化
[分布式监控CAT] Client端源码解析
[分布式监控CAT] Server端源码解析——消息消费\报表处理
Server端概述
(Cat-consumer 用于实时分析从客户端提供的数据\Cat-home 作为用户给用户提供展示的控制端
,并且Cat-home做展示时,通过对Cat-Consumer的调用获取其他节点的数据,将所有数据汇总展示)
consumer、home以及路由中心都是部署在一起的,每个服务端节点都可以充当任何一个角色
CAT服务端在整个实时处理中,基本上实现了全异步化处理:
- 消息消费基于Netty的NIO实现(Netty-Server)。
- 消息消费到服务端就存放内存队列,然后程序开启一个线程会消费这个消息做消息分发(异步消费处理)。
- 每个消息都会有一批线程并发消费各自队列的数据,以做到消息处理的隔离。(每报表每线程,分别按照自己的规则解析消费这个消息,并且可以动态控制对某种报表类型的处理线程个数)
- 消息(原始的消息logView)存储是先存入本地磁盘,然后异步上传到HDFS文件,这也避免了强依赖HDFS。
服务端初始化
Servlet容器加载、启动
CAT目前是使用war包放入Servlet容器(如:tomcat或者jetty,以下假设使用tomcat容器)中的方式部署启动。
熟悉servlet容器的同学应该知道,容器启动时会读取每个Context(可理解为web工程)中的web.xml然后启动Servlet等其他组件。
在cat-home模块中的web.xml中可以看到,除了容器默认的Servlet之外,tomcat启动时会启动CatServlet、MVC这两个Servlet(因为load-on-startup>0,也就是会调用init方法初始化):
<web-app>
<filter>...</filter>
<servlet>
<servlet-name>cat-servlet</servlet-name>
<servlet-class>com.dianping.cat.servlet.CatServlet</servlet-class>
<load-on-startup>1</load-on-startup>
</servlet>
<servlet>
<servlet-name>mvc-servlet</servlet-name>
<servlet-class>org.unidal.web.MVC</servlet-class>
<init-param>
<param-name>cat-client-xml</param-name>
<param-value>client.xml</param-value>
</init-param>
<init-param>
<param-name>init-modules</param-name>
<param-value>false</param-value>
</init-param>
<load-on-startup>2</load-on-startup>
</servlet>
<filter-mapping>...</filter-mapping>
<servlet-mapping>...</servlet-mapping>
<jsp-config>...</jsp-config>
</web-app>
com.dianping.cat.servlet.CatServlet
按照web.xml中Servlet的加载顺序CatServlet会优先于MVC完成初始化。
CatServlet的逻辑基本可以概括为如下两条线:
CatServlet.init——>CatServlet.initComponents——>DefaultModuleInitializer.execute(...)
——>com.dianping.cat.CatHomeModule.setup(ModuleContext ctx)
——>TCPSocketReceiver(netty服务器)
CatServlet.init——>CatServlet.initComponents——>DefaultModuleInitializer.execute(...)
——>com.dianping.cat.***Module.execute(ModuleContext ctx)(完成各个模块的初始化)
com.dianping.cat.servlet.CatServlet.init(ServletConfig servletConfig)
public void init(ServletConfig config) throws ServletException {
super.init(config);
try {//1.plexus IOC容器初始化(根据components.xml的设定完成IOC初始化)
if (m_container == null) {
m_container = ContainerLoader.getDefaultContainer();
}
//2.用来打印日志的m_logger对象实例化(根据plexus.xml设定完成实例化)
m_logger = ((DefaultPlexusContainer) m_container).getLoggerManager().getLoggerForComponent(
getClass().getName());
//3.初始化CAT-Server必备的组件模块:cat-home\cat-consumer\cat-core
initComponents(config);
} catch (Exception e) {
if (m_logger != null) {
m_logger.error("Servlet initializing failed. " + e, e);
} else {
System.out.println("Servlet initializing failed. " + e);
e.printStackTrace(System.out);
}
throw new ServletException("Servlet initializing failed. " + e, e);
}
}
进入initComponents(config); 我们继续看下为了启动server服务,各个cat-*模块如何初始化:
com.dianping.cat.servlet.CatServlet.initComponents(ServletConfig servletConfig)
@Override
protected void initComponents(ServletConfig servletConfig) throws ServletException {
try {
//ModuleContext ctx这个对象里主要作用:
//1.持有 plexus IOC 容器的引用;
//2.持有 logger对象引用,用来打日志。
//3.持有 需要使用到的配置文件路径。
//比如:cat-server-config-file=\data\appdatas\cat\server.xml
//cat-client-config-file=\data\appdatas\cat\client.xml
ModuleContext ctx = new DefaultModuleContext(getContainer());
ModuleInitializer initializer = ctx.lookup(ModuleInitializer.class);
File clientXmlFile = getConfigFile(servletConfig, "cat-client-xml", "client.xml");
File serverXmlFile = getConfigFile(servletConfig, "cat-server-xml", "server.xml");
ctx.setAttribute("cat-client-config-file", clientXmlFile);
ctx.setAttribute("cat-server-config-file", serverXmlFile);
//通过查找启动cat-home必要的模块,然后依次初始化各个模块。
initializer.execute(ctx);
} catch (Exception e) {
m_exception = e;
System.err.println(e);
throw new ServletException(e);
}
}
org.unidal.initialization.DefaultModuleInitializer.execute(…). 执行各个模块的初始化
@Override
public void execute(ModuleContext ctx) {
//我们的topLevelModule是cat-home模块,通过这个模块去查找需要依赖的其他模块并初始化他们。
Module[] modules = m_manager.getTopLevelModules();
execute(ctx, modules);
}
@Override
public void execute(ModuleContext ctx, Module... modules) {
Set<Module> all = new LinkedHashSet<Module>();
info(ctx, "Initializing top level modules:");
for (Module module : modules) {
info(ctx, " " + module.getClass().getName());
}
try {
//1.根据顶层Module获取到下层所有依赖到的modules,并分别调用他们的setup方法
expandAll(ctx, modules, all);
//2.依次调用module实现类的execute方法
for (Module module : all) {
if (!module.isInitialized()) {
executeModule(ctx, module, m_index++);
}
}
} catch (Exception e) {
throw new RuntimeException("Error when initializing modules! Exception: " + e, e);
}
}
private void expandAll(ModuleContext ctx, Module[] modules, Set<Module> all) throws Exception {
if (modules != null) {
for (Module module : modules) {
expandAll(ctx, module.getDependencies(ctx), all);
if (!all.contains(module)) {
if (module instanceof AbstractModule) {
((AbstractModule) module).setup(ctx);//调用各个module实现类的setup
}
//all 最终元素以及顺序:
//CatClientModule\CatCoreModule\CatConsumerModule\CatHomeModule
all.add(module);
}
}
}
}
我们看到cat-home模块是一个顶层模块,接着根据这个模块找到其他依赖模块
(CatClientModule\CatConsumerModule\CatCoreModule),并且依次调用setup方法,解析依次调用模块的execute方法完成初始化。
Modules之间的设计使用了典型的模板模式。
.
模块依赖关系:
null<——CatClientModule<——CatClientModule<——CatCoreModule<——CatConsumerModule<——CatHomeModule
接着着重看一下子类 CatHomeModule的setup的实现。注意除了这个子类,Module的子类steup()方法为空
com.dianping.cat.CatHomeModule.setup(ModuleContext ctx)
@Override
protected void setup(ModuleContext ctx) throws Exception {
File serverConfigFile = ctx.getAttribute("cat-server-config-file");//获取server.xml文件的路径
//通过 plexus IOC 初始化一个 ServerConfigManager bean
ServerConfigManager serverConfigManager = ctx.lookup(ServerConfigManager.class);
//通过 plexus IOC 初始化一个 TcpSocketReceiver bean
final TcpSocketReceiver messageReceiver = ctx.lookup(TcpSocketReceiver.class);
//加载\...\server.xml中的配置
serverConfigManager.initialize(serverConfigFile);
//启动TCPSocketReceiver,就是一个典型的 netty 事件驱动服务器,用来接收客户端的TCP长连接请求
messageReceiver.init();
//增加一个进程观察者,在这个JVM关闭时回调
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
messageReceiver.destory();
}
});
}
各个模块的启动,executeModule
各个模块setup就说到这里,setup完成后,会依次调用module.execute(…)用来完成各个模块的启动。
依次调用:
CatClientModule\CatCoreModule\CatConsumerModule\CatHomeModule.其中只有CatClientModule、CatHomeModule实现了有效的execute方法。
com.dianping.cat.CatClientModule.execute(ModuleContext ctx)
注意:这里的客户端是用来监控服务端的,具体client的解析可以参考:CAT客户端解析
@Override
protected void execute(final ModuleContext ctx) throws Exception {
ctx.info("Current working directory is " + System.getProperty("user.dir"));
// initialize milli-second resolution level timer
MilliSecondTimer.initialize();
// tracking thread start/stop
// Threads用来对线程做管理的类。这里默认给每个新建的线程加上监听器或者说是观察者
Threads.addListener(new CatThreadListener(ctx));
// warm up Cat: setContainer
Cat.getInstance().setContainer(((DefaultModuleContext) ctx).getContainer());
// bring up TransportManager:实例化这个类
ctx.lookup(TransportManager.class);
//ClientConfigManager对象是加载了client.xml的客户端配置管理对象。
//客户端的解析不进行展开,请看之前写的《分布式监控CAT源码解析——cat-client》
ClientConfigManager clientConfigManager = ctx.lookup(ClientConfigManager.class);
if (clientConfigManager.isCatEnabled()) {
StatusUpdateTask statusUpdateTask = ctx.lookup(StatusUpdateTask.class);
Threads.forGroup("cat").start(statusUpdateTask);
LockSupport.parkNanos(10 * 1000 * 1000L); // wait 10 ms
}
}
com.dianping.cat.CatHomeModule.execute(ModuleContext ctx)
CatHomeModule涉及很多可说的,此处暂时不做展开,继续按照Servlet启动的流程讲解。
@Override
protected void execute(ModuleContext ctx) throws Exception {
ServerConfigManager serverConfigManager = ctx.lookup(ServerConfigManager.class);
//初始化MessageConsumer子类RealtimeConsumer,不仅实例化这个类MessageConsumer对象,还会把这个类中的成员全部实例化
// <plexus>
// <components>
// <component>
// <role>com.dianping.cat.analysis.MessageConsumer</role>
// <implementation>com.dianping.cat.analysis.RealtimeConsumer</implementation>
// <requirements>
// <requirement>
// <role>com.dianping.cat.analysis.MessageAnalyzerManager</role>
// </requirement>
// <requirement>
// <role>com.dianping.cat.statistic.ServerStatisticManager</role>
// </requirement>
// <requirement>
// <role>com.dianping.cat.config.server.BlackListManager</role>
// </requirement>
// </requirements>
// </component>
ctx.lookup(MessageConsumer.class);
ConfigReloadTask configReloadTask = ctx.lookup(ConfigReloadTask.class);
Threads.forGroup("cat").start(configReloadTask);
if (serverConfigManager.isJobMachine()) {
DefaultTaskConsumer taskConsumer = ctx.lookup(DefaultTaskConsumer.class);
Threads.forGroup("cat").start(taskConsumer);
}
if (serverConfigManager.isAlertMachine()) {//如果当前结点开启了告警功能,则对每种报表启动一个daemon线程。1分钟检查一次
BusinessAlert metricAlert = ctx.lookup(BusinessAlert.class);
NetworkAlert networkAlert = ctx.lookup(NetworkAlert.class);
DatabaseAlert databaseAlert = ctx.lookup(DatabaseAlert.class);
SystemAlert systemAlert = ctx.lookup(SystemAlert.class);
ExceptionAlert exceptionAlert = ctx.lookup(ExceptionAlert.class);
FrontEndExceptionAlert frontEndExceptionAlert = ctx.lookup(FrontEndExceptionAlert.class);
HeartbeatAlert heartbeatAlert = ctx.lookup(HeartbeatAlert.class);
ThirdPartyAlert thirdPartyAlert = ctx.lookup(ThirdPartyAlert.class);
ThirdPartyAlertBuilder alertBuildingTask = ctx.lookup(ThirdPartyAlertBuilder.class);
AppAlert appAlert = ctx.lookup(AppAlert.class);
WebAlert webAlert = ctx.lookup(WebAlert.class);
TransactionAlert transactionAlert = ctx.lookup(TransactionAlert.class);
EventAlert eventAlert = ctx.lookup(EventAlert.class);
StorageSQLAlert storageDatabaseAlert = ctx.lookup(StorageSQLAlert.class);
StorageCacheAlert storageCacheAlert = ctx.lookup(StorageCacheAlert.class);
Threads.forGroup("cat").start(networkAlert);
Threads.forGroup("cat").start(databaseAlert);
Threads.forGroup("cat").start(systemAlert);
Threads.forGroup("cat").start(metricAlert);
Threads.forGroup("cat").start(exceptionAlert);
Threads.forGroup("cat").start(frontEndExceptionAlert);
Threads.forGroup("cat").start(heartbeatAlert);
Threads.forGroup("cat").start(thirdPartyAlert);
Threads.forGroup("cat").start(alertBuildingTask);
Threads.forGroup("cat").start(appAlert);
Threads.forGroup("cat").start(webAlert);
Threads.forGroup("cat").start(transactionAlert);
Threads.forGroup("cat").start(eventAlert);
Threads.forGroup("cat").start(storageDatabaseAlert);
Threads.forGroup("cat").start(storageCacheAlert);
}
final MessageConsumer consumer = ctx.lookup(MessageConsumer.class);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
consumer.doCheckpoint();
}
});
}
至此,CatServlet初始化完成了,接下来会初始化org.unidal.web.MVC这个Servlet。
我们接着看一下另外一个Servlet:mvc-servlet
org.unidal.web.MVC
MVC这个Servlet继承了AbstractContainerServlet,与CatServlet非常类似,均是AbstractContainerServlet 的实现类。这个Servlet顾名思义就是用来处理请求的,类似Spring中的DispatcherServlet,集中分配进入的请求到对应的Controller。
public void init(ServletConfig config) throws ServletException {…}
与CatServelet一样,均继承自父类:
public void init(ServletConfig config) throws ServletException {
super.init(config);
try {
if (m_container == null) {
//DefaultPlexusContainer m_container 是单例对象,在CATServlet中已经完成初始化了
m_container = ContainerLoader.getDefaultContainer();
}
m_logger = ((DefaultPlexusContainer) m_container).getLoggerManager().getLoggerForComponent(
getClass().getName());
initComponents(config);
} ......
org.unidal.web.MVC.initComponents(ServletConfig config) throws Exception
@Override
protected void initComponents(ServletConfig config) throws Exception {
// /cat
String contextPath = config.getServletContext().getContextPath();
// /cat
String path = contextPath == null || contextPath.length() == 0 ? "/" : contextPath;
getLogger().info("MVC is starting at " + path);
//使用client.xml初始化代表CATClient的com.dianping.cat.Cat对象(如果CAT未被初始化)。
initializeCat(config);
initializeModules(config);
m_handler = lookup(RequestLifecycle.class, "mvc");
m_handler.setServletContext(config.getServletContext());
config.getServletContext().setAttribute(ID, this);
getLogger().info("MVC started at " + path);
}
至此,容器启动成功,http://localhost:2281/cat/r 进入页面。
接下来,我们详细分解CAT-Server的核心功能实现,请看下一篇。
更多推荐
所有评论(0)