相关文章:

[分布式监控CAT] Client端源码解析
[分布式监控CAT] Server端源码解析——消息消费\报表处理

前言

本文主要讲解CAT-Server(包括Cat-home\Cat-consumer等模块),从Cat-Server端的概述到具体初始化源码分析,后续文章会继续从源码入手进行分析。

Server端

(Cat-consumer 用于实时分析从客户端提供的数据\Cat-home 作为用户给用户提供展示的控制端
,并且Cat-home做展示时,通过对Cat-Consumer的调用获取其他节点的数据,将所有数据汇总展示)

consumer、home以及路由中心都是部署在一起的,每个服务端节点都可以充当任何一个角色

**Client端 **
(Cat-client 提供给业务以及中间层埋点的底层SDK)

相关文章:
[分布式监控CAT] Server端源码解析——初始化
[分布式监控CAT] Client端源码解析
[分布式监控CAT] Server端源码解析——消息消费\报表处理

Server端概述

(Cat-consumer 用于实时分析从客户端提供的数据\Cat-home 作为用户给用户提供展示的控制端
,并且Cat-home做展示时,通过对Cat-Consumer的调用获取其他节点的数据,将所有数据汇总展示)

consumer、home以及路由中心都是部署在一起的,每个服务端节点都可以充当任何一个角色

这里写图片描述

CAT服务端在整个实时处理中,基本上实现了全异步化处理:

  • 消息消费基于Netty的NIO实现(Netty-Server)。
  • 消息消费到服务端就存放内存队列,然后程序开启一个线程会消费这个消息做消息分发(异步消费处理)。
  • 每个消息都会有一批线程并发消费各自队列的数据,以做到消息处理的隔离。(每报表每线程,分别按照自己的规则解析消费这个消息,并且可以动态控制对某种报表类型的处理线程个数)
  • 消息(原始的消息logView)存储是先存入本地磁盘,然后异步上传到HDFS文件,这也避免了强依赖HDFS。

服务端初始化

Servlet容器加载、启动

CAT目前是使用war包放入Servlet容器(如:tomcat或者jetty,以下假设使用tomcat容器)中的方式部署启动。
熟悉servlet容器的同学应该知道,容器启动时会读取每个Context(可理解为web工程)中的web.xml然后启动Servlet等其他组件。

在cat-home模块中的web.xml中可以看到,除了容器默认的Servlet之外,tomcat启动时会启动CatServlet、MVC这两个Servlet(因为load-on-startup>0,也就是会调用init方法初始化):

<web-app>

<filter>...</filter>

<servlet>
		<servlet-name>cat-servlet</servlet-name>
		<servlet-class>com.dianping.cat.servlet.CatServlet</servlet-class>
		<load-on-startup>1</load-on-startup>
	</servlet>
	<servlet>
		<servlet-name>mvc-servlet</servlet-name>
		<servlet-class>org.unidal.web.MVC</servlet-class>
		<init-param>
			<param-name>cat-client-xml</param-name>
			<param-value>client.xml</param-value>
		</init-param>
		<init-param>
			<param-name>init-modules</param-name>
			<param-value>false</param-value>
		</init-param>
		<load-on-startup>2</load-on-startup>
	</servlet>

<filter-mapping>...</filter-mapping>
<servlet-mapping>...</servlet-mapping>
<jsp-config>...</jsp-config>

</web-app>

com.dianping.cat.servlet.CatServlet

按照web.xml中Servlet的加载顺序CatServlet会优先于MVC完成初始化。
CatServlet的逻辑基本可以概括为如下两条线:

CatServlet.init——>CatServlet.initComponents——>DefaultModuleInitializer.execute(...) 
			——>com.dianping.cat.CatHomeModule.setup(ModuleContext ctx)
				——>TCPSocketReceiver(netty服务器)

CatServlet.init——>CatServlet.initComponents——>DefaultModuleInitializer.execute(...) 
		——>com.dianping.cat.***Module.execute(ModuleContext ctx)(完成各个模块的初始化)

com.dianping.cat.servlet.CatServlet.init(ServletConfig servletConfig)

public void init(ServletConfig config) throws ServletException {
		super.init(config);

		try {//1.plexus IOC容器初始化(根据components.xml的设定完成IOC初始化)
			if (m_container == null) {
				m_container = ContainerLoader.getDefaultContainer();
			}
			//2.用来打印日志的m_logger对象实例化(根据plexus.xml设定完成实例化)
			m_logger = ((DefaultPlexusContainer) m_container).getLoggerManager().getLoggerForComponent(
			      getClass().getName());
			//3.初始化CAT-Server必备的组件模块:cat-home\cat-consumer\cat-core
			initComponents(config);
		} catch (Exception e) {
			if (m_logger != null) {
				m_logger.error("Servlet initializing failed. " + e, e);
			} else {
				System.out.println("Servlet initializing failed. " + e);
				e.printStackTrace(System.out);
			}

			throw new ServletException("Servlet initializing failed. " + e, e);
		}
	}

进入initComponents(config); 我们继续看下为了启动server服务,各个cat-*模块如何初始化:

com.dianping.cat.servlet.CatServlet.initComponents(ServletConfig servletConfig)

	@Override
	protected void initComponents(ServletConfig servletConfig) throws ServletException {
		try {
		//ModuleContext ctx这个对象里主要作用:
		//1.持有 plexus IOC 容器的引用;
		//2.持有 logger对象引用,用来打日志。
		//3.持有 需要使用到的配置文件路径。
		//比如:cat-server-config-file=\data\appdatas\cat\server.xml 
		//cat-client-config-file=\data\appdatas\cat\client.xml

			ModuleContext ctx = new DefaultModuleContext(getContainer());
			ModuleInitializer initializer = ctx.lookup(ModuleInitializer.class);
			File clientXmlFile = getConfigFile(servletConfig, "cat-client-xml", "client.xml");
			File serverXmlFile = getConfigFile(servletConfig, "cat-server-xml", "server.xml");

			ctx.setAttribute("cat-client-config-file", clientXmlFile);
			ctx.setAttribute("cat-server-config-file", serverXmlFile);
			//通过查找启动cat-home必要的模块,然后依次初始化各个模块。
			initializer.execute(ctx);
		} catch (Exception e) {
			m_exception = e;
			System.err.println(e);
			throw new ServletException(e);
		}
	}

org.unidal.initialization.DefaultModuleInitializer.execute(…). 执行各个模块的初始化

   @Override
   public void execute(ModuleContext ctx) {
   
   //我们的topLevelModule是cat-home模块,通过这个模块去查找需要依赖的其他模块并初始化他们。
      Module[] modules = m_manager.getTopLevelModules();
      execute(ctx, modules);
      }


   @Override
   public void execute(ModuleContext ctx, Module... modules) {
      Set<Module> all = new LinkedHashSet<Module>();

      info(ctx, "Initializing top level modules:");

      for (Module module : modules) {
         info(ctx, "   " + module.getClass().getName());
      }

      try {
      //1.根据顶层Module获取到下层所有依赖到的modules,并分别调用他们的setup方法
         expandAll(ctx, modules, all);
      //2.依次调用module实现类的execute方法
         for (Module module : all) {
            if (!module.isInitialized()) {
               executeModule(ctx, module, m_index++);
            }
         }
      } catch (Exception e) {
         throw new RuntimeException("Error when initializing modules! Exception: " + e, e);
      }
   }

   private void expandAll(ModuleContext ctx, Module[] modules, Set<Module> all) throws Exception {
      if (modules != null) {
         for (Module module : modules) {
            expandAll(ctx, module.getDependencies(ctx), all);

            if (!all.contains(module)) {
               if (module instanceof AbstractModule) {
                  ((AbstractModule) module).setup(ctx);//调用各个module实现类的setup
               }
	//all 最终元素以及顺序:
	//CatClientModule\CatCoreModule\CatConsumerModule\CatHomeModule

               all.add(module);
            }
         }
      }
   }

我们看到cat-home模块是一个顶层模块,接着根据这个模块找到其他依赖模块
(CatClientModule\CatConsumerModule\CatCoreModule),并且依次调用setup方法,解析依次调用模块的execute方法完成初始化。

Modules之间的设计使用了典型的模板模式。
这里写图片描述.

模块依赖关系:
null<——CatClientModule<——CatClientModule<——CatCoreModule<——CatConsumerModule<——CatHomeModule

接着着重看一下子类 CatHomeModule的setup的实现。注意除了这个子类,Module的子类steup()方法为空
com.dianping.cat.CatHomeModule.setup(ModuleContext ctx)

@Override
	protected void setup(ModuleContext ctx) throws Exception {
		File serverConfigFile = ctx.getAttribute("cat-server-config-file");//获取server.xml文件的路径
		//通过 plexus IOC 初始化一个 ServerConfigManager bean
		ServerConfigManager serverConfigManager = ctx.lookup(ServerConfigManager.class);
		//通过 plexus IOC 初始化一个 TcpSocketReceiver bean
		final TcpSocketReceiver messageReceiver = ctx.lookup(TcpSocketReceiver.class);
		//加载\...\server.xml中的配置
		serverConfigManager.initialize(serverConfigFile);
		//启动TCPSocketReceiver,就是一个典型的 netty 事件驱动服务器,用来接收客户端的TCP长连接请求
		messageReceiver.init();
        //增加一个进程观察者,在这个JVM关闭时回调
		Runtime.getRuntime().addShutdownHook(new Thread() {

			@Override
			public void run() {
				messageReceiver.destory();
			}
		});
	}

各个模块的启动,executeModule
各个模块setup就说到这里,setup完成后,会依次调用module.execute(…)用来完成各个模块的启动。

依次调用:
CatClientModule\CatCoreModule\CatConsumerModule\CatHomeModule.其中只有CatClientModule、CatHomeModule实现了有效的execute方法。

com.dianping.cat.CatClientModule.execute(ModuleContext ctx)
注意:这里的客户端是用来监控服务端的,具体client的解析可以参考:CAT客户端解析

@Override
	protected void execute(final ModuleContext ctx) throws Exception {
		ctx.info("Current working directory is " + System.getProperty("user.dir"));

		// initialize milli-second resolution level timer
		MilliSecondTimer.initialize();

		// tracking thread start/stop
        // Threads用来对线程做管理的类。这里默认给每个新建的线程加上监听器或者说是观察者
		Threads.addListener(new CatThreadListener(ctx));

		// warm up Cat: setContainer
		Cat.getInstance().setContainer(((DefaultModuleContext) ctx).getContainer());

		// bring up TransportManager:实例化这个类
		ctx.lookup(TransportManager.class);
		
        //ClientConfigManager对象是加载了client.xml的客户端配置管理对象。
        //客户端的解析不进行展开,请看之前写的《分布式监控CAT源码解析——cat-client》
		ClientConfigManager clientConfigManager = ctx.lookup(ClientConfigManager.class);
		if (clientConfigManager.isCatEnabled()) {
			StatusUpdateTask statusUpdateTask = ctx.lookup(StatusUpdateTask.class);
			Threads.forGroup("cat").start(statusUpdateTask);
			LockSupport.parkNanos(10 * 1000 * 1000L); // wait 10 ms

		}
	}

com.dianping.cat.CatHomeModule.execute(ModuleContext ctx)
CatHomeModule涉及很多可说的,此处暂时不做展开,继续按照Servlet启动的流程讲解。

	@Override
	protected void execute(ModuleContext ctx) throws Exception {
		ServerConfigManager serverConfigManager = ctx.lookup(ServerConfigManager.class);
		//初始化MessageConsumer子类RealtimeConsumer,不仅实例化这个类MessageConsumer对象,还会把这个类中的成员全部实例化
		//		<plexus>
		//	    <components>
		//	        <component>
		//	            <role>com.dianping.cat.analysis.MessageConsumer</role>
		//	            <implementation>com.dianping.cat.analysis.RealtimeConsumer</implementation>
		//	            <requirements>
		//	                <requirement>
		//	                    <role>com.dianping.cat.analysis.MessageAnalyzerManager</role>
		//	                </requirement>
		//	                <requirement>
		//	                    <role>com.dianping.cat.statistic.ServerStatisticManager</role>
		//	                </requirement>
		//	                <requirement>
		//	                    <role>com.dianping.cat.config.server.BlackListManager</role>
		//	                </requirement>
		//	            </requirements>
		//	        </component>

		ctx.lookup(MessageConsumer.class);

		ConfigReloadTask configReloadTask = ctx.lookup(ConfigReloadTask.class);
		Threads.forGroup("cat").start(configReloadTask);

		if (serverConfigManager.isJobMachine()) {
			DefaultTaskConsumer taskConsumer = ctx.lookup(DefaultTaskConsumer.class);

			Threads.forGroup("cat").start(taskConsumer);
		}

		if (serverConfigManager.isAlertMachine()) {//如果当前结点开启了告警功能,则对每种报表启动一个daemon线程。1分钟检查一次
			BusinessAlert metricAlert = ctx.lookup(BusinessAlert.class);
			NetworkAlert networkAlert = ctx.lookup(NetworkAlert.class);
			DatabaseAlert databaseAlert = ctx.lookup(DatabaseAlert.class);
			SystemAlert systemAlert = ctx.lookup(SystemAlert.class);
			ExceptionAlert exceptionAlert = ctx.lookup(ExceptionAlert.class);
			FrontEndExceptionAlert frontEndExceptionAlert = ctx.lookup(FrontEndExceptionAlert.class);
			HeartbeatAlert heartbeatAlert = ctx.lookup(HeartbeatAlert.class);
			ThirdPartyAlert thirdPartyAlert = ctx.lookup(ThirdPartyAlert.class);
			ThirdPartyAlertBuilder alertBuildingTask = ctx.lookup(ThirdPartyAlertBuilder.class);
			AppAlert appAlert = ctx.lookup(AppAlert.class);
			WebAlert webAlert = ctx.lookup(WebAlert.class);
			TransactionAlert transactionAlert = ctx.lookup(TransactionAlert.class);
			EventAlert eventAlert = ctx.lookup(EventAlert.class);
			StorageSQLAlert storageDatabaseAlert = ctx.lookup(StorageSQLAlert.class);
			StorageCacheAlert storageCacheAlert = ctx.lookup(StorageCacheAlert.class);

			Threads.forGroup("cat").start(networkAlert);
			Threads.forGroup("cat").start(databaseAlert);
			Threads.forGroup("cat").start(systemAlert);
			Threads.forGroup("cat").start(metricAlert);
			Threads.forGroup("cat").start(exceptionAlert);
			Threads.forGroup("cat").start(frontEndExceptionAlert);
			Threads.forGroup("cat").start(heartbeatAlert);
			Threads.forGroup("cat").start(thirdPartyAlert);
			Threads.forGroup("cat").start(alertBuildingTask);
			Threads.forGroup("cat").start(appAlert);
			Threads.forGroup("cat").start(webAlert);
			Threads.forGroup("cat").start(transactionAlert);
			Threads.forGroup("cat").start(eventAlert);
			Threads.forGroup("cat").start(storageDatabaseAlert);
			Threads.forGroup("cat").start(storageCacheAlert);
		}

		final MessageConsumer consumer = ctx.lookup(MessageConsumer.class);
		Runtime.getRuntime().addShutdownHook(new Thread() {

			@Override
			public void run() {
				consumer.doCheckpoint();
			}
		});
	}

至此,CatServlet初始化完成了,接下来会初始化org.unidal.web.MVC这个Servlet。
我们接着看一下另外一个Servlet:mvc-servlet

org.unidal.web.MVC

MVC这个Servlet继承了AbstractContainerServlet,与CatServlet非常类似,均是AbstractContainerServlet 的实现类。这个Servlet顾名思义就是用来处理请求的,类似Spring中的DispatcherServlet,集中分配进入的请求到对应的Controller。

public void init(ServletConfig config) throws ServletException {…}
与CatServelet一样,均继承自父类:

public void init(ServletConfig config) throws ServletException {
		super.init(config);

		try {
			if (m_container == null) {
			//DefaultPlexusContainer m_container 是单例对象,在CATServlet中已经完成初始化了
				m_container = ContainerLoader.getDefaultContainer();
			}

			m_logger = ((DefaultPlexusContainer) m_container).getLoggerManager().getLoggerForComponent(
			      getClass().getName());

			initComponents(config);
		} ......

org.unidal.web.MVC.initComponents(ServletConfig config) throws Exception

  @Override
   protected void initComponents(ServletConfig config) throws Exception {
	  // /cat
      String contextPath = config.getServletContext().getContextPath();
      // /cat
      String path = contextPath == null || contextPath.length() == 0 ? "/" : contextPath;

      getLogger().info("MVC is starting at " + path);
//使用client.xml初始化代表CATClient的com.dianping.cat.Cat对象(如果CAT未被初始化)。
      initializeCat(config);
      initializeModules(config);

      m_handler = lookup(RequestLifecycle.class, "mvc");
      m_handler.setServletContext(config.getServletContext());

      config.getServletContext().setAttribute(ID, this);
      getLogger().info("MVC started at " + path);
   }

至此,容器启动成功,http://localhost:2281/cat/r 进入页面。

接下来,我们详细分解CAT-Server的核心功能实现,请看下一篇。

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐