目前已有的分布式配置中心方案,有淘宝的diamond和百度的disconf。但由于公司需要的配置中心的需求比较简单,而且使用的数据库是MongoDB,而不像disconf需要mysql,于是就手写了一个简单的分布式配置中心。

 

1. 总体设计原则

  •      公用配置不应该分散存在于各应用中,而是应该抽出来,统一存储到一个公用的位置(数据库或zookeeper)
  •      对这些公用配置的添加、修改,应该有一个统一的配置管理中心应用来处理(做一个Web应用来增、删、查、改)
  •     当公用的配置变化时,子应用不需要重新部署或重新启动,就能使用新的配置参数

2. 总体流程图

其中,

        配置中心模块就是接下来我们要实现的部分,它通过dubbo开放增加、修改、删除功能的接口,给Web配置管理模块调用。

        子应用指的是需要将配置文件分离出来放到配置中心的模块。

3. 配置文件

pom.xml

	<!-- spring -->
    <dependency>
	    <groupId>org.springframework</groupId>
	    <artifactId>spring-context</artifactId>
	    <version>5.0.7.RELEASE</version>
	</dependency>
    
	<dependency>
	    <groupId>com.101tec</groupId>
	    <artifactId>zkclient</artifactId>
	    <version>0.10</version>
	</dependency>
	
	<!-- mongodb -->
    <dependency>
	    <groupId>org.mongodb</groupId>
	    <artifactId>mongo-java-driver</artifactId>
	    <version>3.4.3</version>
	</dependency>
    
	<!-- spring-data-mongodb -->
	<dependency>
	    <groupId>org.springframework.data</groupId>
	    <artifactId>spring-data-mongodb</artifactId>
	    <version>1.9.4.RELEASE</version>
	</dependency>
	
	<!-- fastjson -->
	<dependency>
	    <groupId>com.alibaba</groupId>
	    <artifactId>fastjson</artifactId>
	    <version>1.2.47</version>
	</dependency>
	
	<!-- log4j -->
	<dependency>
	    <groupId>log4j</groupId>
	    <artifactId>log4j</artifactId>
	    <version>1.2.17</version>
	</dependency>
	
	<!-- dubbo -->
  	<dependency>
	    <groupId>com.alibaba</groupId>
	    <artifactId>dubbo</artifactId>
	    <version>2.6.2</version>
	</dependency>
	
	<!-- zookeeper -->
    <dependency>
	     <groupId>org.apache.zookeeper</groupId>
	     <artifactId>zookeeper</artifactId>
	     <version>3.4.12</version>
 	</dependency>
	<dependency>
	    <groupId>org.apache.curator</groupId>
	    <artifactId>curator-recipes</artifactId>
	    <version>4.0.1</version>
	</dependency>
	
	<!-- dom4j -->
	<dependency>
	    <groupId>dom4j</groupId>
	    <artifactId>dom4j</artifactId>
	    <version>1.6.1</version>
	</dependency>

applicationContext.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xmlns:p="http://www.springframework.org/schema/p"
	xmlns:context="http://www.springframework.org/schema/context"
	xmlns:mongo="http://www.springframework.org/schema/data/mongo"
	xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
		http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd
		http://www.springframework.org/schema/data/mongo http://www.springframework.org/schema/data/mongo/spring-mongo-1.8.xsd
		http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd
		http://code.alibabatech.com/schema/dubbo http://code.alibabatech.com/schema/dubbo/dubbo.xsd">


	<bean id="propertyConfigurer" class="com.zhuyun.user.util.ProjectDBinfoConfigurer">
		<property name="location">
		<value>configCenter.properties</value>
		</property>
		<property name="fileEncoding" value="utf-8" />
	</bean>
	
	<!-- 自动扫描注解 -->
 	<context:component-scan base-package="com.zhuyun" />
	
	<!-- credentials="${mongo.credentials}" -->
    <mongo:mongo-client id="mongoClient" replica-set="${mongo.replica-set}">
        <mongo:client-options
                connections-per-host="1500"
                threads-allowed-to-block-for-connection-multiplier="1"
                connect-timeout="5000"
                max-wait-time="120000"
                socket-keep-alive="true"
                socket-timeout="0"
                write-concern="ACKNOWLEDGED"           
                read-preference="PRIMARY" />
                <!-- write-concern="acknowledged" -->
    </mongo:mongo-client>    
    
    <mongo:db-factory id="mongoDbFactory" dbname="config_center" mongo-ref="mongoClient" />
    
    <bean id="mongoTypeMapper" class="org.springframework.data.mongodb.core.convert.DefaultMongoTypeMapper">
        <constructor-arg name="typeKey">
            <null/>
        </constructor-arg>
    </bean>
    
    <bean id="mongoMappingContext" class="org.springframework.data.mongodb.core.mapping.MongoMappingContext"/>

    <bean id="mongoConverter" class="org.springframework.data.mongodb.core.convert.MappingMongoConverter">
        <constructor-arg name="mongoDbFactory" ref="mongoDbFactory"/>
        <constructor-arg name="mappingContext" ref="mongoMappingContext"/>
        <property name="typeMapper" ref="mongoTypeMapper" />
    </bean>
	
	<bean id="mongoTemplate" class="org.springframework.data.mongodb.core.MongoTemplate">
		<constructor-arg name="mongoDbFactory" ref="mongoDbFactory"/>
		<constructor-arg name="mongoConverter" ref="mongoConverter"/>
	</bean>	

	<dubbo:application name="config_center" />
	<dubbo:registry protocol="zookeeper" address="${zookeeper.address}" />
	<dubbo:consumer timeout="50000" />
  	
  	<!-- 	具体的实现bean -->
    <bean id="configCenter" class="com.zhuyun.dubbo.impl.ConfigCenterImpl" />
    
    <dubbo:protocol name="dubbo" host="${dubbo.host}" port="${dubbo.port}" threads="1500" queues="3000" accepts="1500"/>
   	<!--  声明需要暴露的服务接口 -->
    <dubbo:service interface="com.zhuyun.dubbo.ConfigCenter" ref="configCenter" />

</beans>

 

configCenter.properties

mongo.replica-set=192.168.10.201:27017
zookeeper.address=192.168.10.201:2181
dubbo.host=127.0.0.1
dubbo.port=20000

 

log4j.properties

#log4j.rootLogger=INFO, stdout, R
log4j.rootLogger=DEBUG, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n

#log4j.appender.R=org.apache.log4j.RollingFileAppender
#log4j.appender.R.Threshold=INFO
#log4j.appender.R.File=/var/log/newton/push_engine_2.1.log
#log4j.appender.R.Append=true
#log4j.appender.R.MaxFileSize=300MB
#log4j.appender.R.MaxBackupIndex=5
#log4j.appender.R.layout=org.apache.log4j.PatternLayout 
#log4j.appender.R.layout.ConversionPattern=%d %p [%c] - %m%n

 

4.代码部分

4.1 MongoDB接口部分

ConfigDao.java

package com.zhuyun.dao;

import java.util.List;

import com.zhuyun.entity.Config;


public interface ConfigDao {
	public void insert(Config config);
	
	/**
	 * 获取所有配置
	 * @return
	 */
	public List<Config> findAll();
	
	public Config findOne(String config_name);
	
	/**
	 * 结果为1,表示更新成功
	 * @param config
	 * @return
	 */
	public int update(Config config);
	
	/**
	 * 如果config_name存在,则更新
	 * 否则,则插入
	 * @param config
	 * @return 
	 * 			-1	数据库异常
	 * 			1	config_name存在,更新
	 * 			2	config_name不存在,插入
	 */
	public int upsert(Config config);
	
	/**
	 * 结果为 -1,数据库异常
	 * 		0,表示数据不存在
	 *      1,表示删除成功
	 */
	public int remove(String config_name);
}

ConfigDaoImpl.java

package com.zhuyun.dao.impl;

import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.query.Update;
import org.springframework.stereotype.Component;

import com.mongodb.WriteResult;
import com.zhuyun.dao.ConfigDao;
import com.zhuyun.entity.Config;

@Component("configDaoImpl")
public class ConfigDaoImpl implements ConfigDao {
	public static Logger LOG = LoggerFactory.getLogger(ConfigDaoImpl.class);
	@Autowired
	public MongoTemplate mongoTemplate;

	@Override
	public void insert(Config config) {
		mongoTemplate.insert(config, "config");
	}

	@Override
	public Config findOne(String config_name) {
		 Config config = mongoTemplate.findOne(Query.query(Criteria.where("config_name").is(config_name)), 
				Config.class, "config");
		 if (config != null) {
			 LOG.info(".... load {} config from MongoDB: {}...", config.getConfig_name(), config.toString());
		}
		 return config;
	}
	
	@Override
	public List<Config> findAll() {
		 List<Config> allConfigs = mongoTemplate.findAll(Config.class, "config");
		 LOG.info(".... load all configs from MongoDB: {}...", allConfigs);
		 return allConfigs;
	}

	@Override
	public int update(Config config) {
		WriteResult result = mongoTemplate.updateFirst(Query.query(Criteria.where("config_name").is(config.getConfig_name())), 
				Update.update("config_content", config.getConfig_content()), "config");
		return result.getN();
	}
	
	@Override
	public int upsert(Config config) {
		 WriteResult result = mongoTemplate.upsert(Query.query(Criteria.where("config_name").is(config.getConfig_name())),
				Update.update("config_content", config.getConfig_content()), 
				"config");
		 int result_code = -1;
		 if (result.getN() <= 0) {
			result_code = -1;
		}
		 if (result.isUpdateOfExisting() == true) {
			result_code = 1;
			LOG.info(".... upsert {} config to MongoDB: {}...", config.getConfig_name(), config.toString());
		}else {
			result_code = 2;
			LOG.info(".... upsert {} config to MongoDB: {}...", config.getConfig_name(), config.toString());
		}
		 return result_code;
	}

	@Override
	public int remove(String config_name) {
		int result_code = -1;
		WriteResult result = mongoTemplate.remove(Query.query(Criteria.where("config_name").is(config_name)), "config");
		 if (result.getN() < 0) {
				result_code = -1;
		 }else if (result.getN() == 0) {
			 	result_code = 0;
			 	LOG.info(".... delete failed, {} config not exist in MongoDB ...", config_name);
		} else {
				result_code = 1;
				LOG.info(".... delete {} config from MongoDB ...", config_name);
		 }
		return result_code;
	}


}

 

4.2 dubbo接口部分

ConfigCenter.java

package com.zhuyun.dubbo;

import com.zhuyun.entity.Config;

public interface ConfigCenter {

	/**
	 * 加载配置,包括加载配置到MongoDB数据库和同步配置到zookeeper
	 * @param config
	 * @return	
	 * 			{"retcode": "601"}	数据库错误
	 * 			{"retcode": "200"} 	该配置在数据库里已存在,则更新该配置,并同步到zookeeper
	 * 			{"retcode": "201"} 	该配置在数据库里不存在,则插入改配置,并同步到zookeeper
	 */
	public String upLoadConfig(Config config);
	
	/**
	 * 获取配置,从MongoDB数据库获取
	 * @param config_name
	 * @return
	 */
	public Config getConfig(String config_name);
	
	/**
	 * 删除配置, 从MongoDB数据库删除该配置,同时删除zookeeper上的该配置节点
	 * @param config_name
	 * @return
	 * 			{"retcode": "601"}	数据库错误
	 * 			{"retcode": "602"} 	该配置在数据库里不存在
	 * 			{"retcode": "200"} 	该配置在数据库里存在,并删除成功,同时删除ZK的相应节点
	 */
	public String removeConfig(String config_name);
}

ConfigCenterImpl.java

package com.zhuyun.dubbo.impl;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.alibaba.fastjson.JSONObject;
import com.zhuyun.ConfigCenter.ZkConfigUtil;
import com.zhuyun.dao.ConfigDao;
import com.zhuyun.dubbo.ConfigCenter;
import com.zhuyun.entity.Config;
import com.zhuyun.main.Main;

@Component("configCenterImpl")
public class ConfigCenterImpl implements ConfigCenter {
	@Autowired
	private ConfigDao configDao;

	@Override
	public String upLoadConfig(Config config) {
		int result = configDao.upsert(config);
		JSONObject jsonObject = new JSONObject();
		
		if (result == -1) {							//数据库错误
			jsonObject.put("retcode", "601");		
		}else if (result == 1) {					//该配置在数据库里已存在,则更新该配置,并同步到zookeeper
			ZkConfigUtil.syncConfigToZk(Main.zkAddr, config);
			jsonObject.put("retcode", "200");		
		}else if (result == 2) {					//该配置在数据库里不存在,则插入改配置,并同步到zookeeper
			ZkConfigUtil.syncConfigToZk(Main.zkAddr, config);
			jsonObject.put("retcode", "201");		
		}
		return jsonObject.toJSONString();
	}

	@Override
	public Config getConfig(String config_name) {
		return configDao.findOne(config_name);
	}

	@Override
	public String removeConfig(String config_name) {
		int result = configDao.remove(config_name);
		JSONObject jsonObject = new JSONObject();
		
		if (result == -1) {							//数据库错误
			jsonObject.put("retcode", "601");		
		}else if (result == 0) {					//该配置在数据库里不存在
			jsonObject.put("retcode", "602");		
		}else if (result == 1) {					//该配置在数据库里存在,并删除成功,同时删除ZK的相应节点
			ZkConfigUtil.delConfigToZk(Main.zkAddr, config_name);
			jsonObject.put("retcode", "200");		
		}
		return jsonObject.toJSONString();
	}

}

4.3 实体类部分

Config.java

package com.zhuyun.entity;

import java.io.Serializable;

public class Config implements Serializable{
	private static final long serialVersionUID = 1L;
	
	private String config_name;
	private Object config_content;

	public String getConfig_name() {
		return config_name;
	}

	public void setConfig_name(String config_name) {
		this.config_name = config_name;
	}

	public Object getConfig_content() {
		return config_content;
	}

	public void setConfig_content(Object config_content) {
		this.config_content = config_content;
	}

	public Config() {
		super();
	}

	public Config(String config_name, Object config_content) {
		super();
		this.config_name = config_name;
		this.config_content = config_content;
	}

	@Override
	public String toString() {
		return "Config [config_name=" + config_name + ", config_content="
				+ new String((byte[])config_content) + "]";
	}

	@Override
	public int hashCode() {
		final int prime = 31;
		int result = 1;
		result = prime * result
				+ ((config_content == null) ? 0 : config_content.hashCode());
		result = prime * result
				+ ((config_name == null) ? 0 : config_name.hashCode());
		return result;
	}

	@Override
	public boolean equals(Object obj) {
		if (this == obj)
			return true;
		if (obj == null)
			return false;
		if (getClass() != obj.getClass())
			return false;
		Config other = (Config) obj;
		if (config_content == null) {
			if (other.config_content != null)
				return false;
		} else if (!config_content.equals(other.config_content))
			return false;
		if (config_name == null) {
			if (other.config_name != null)
				return false;
		} else if (!config_name.equals(other.config_name))
			return false;
		return true;
	}

}

注:此实体类中,config_content的类型是Object类型, 是为了更好的扩展性,它可以是简单的String类型,也可以是byte[]类型的,这样的话,配置文件就可以转换成byte[]存储在数据库中了。

4.4 工具类

ProjectDBinfoConfigurer.java

package com.zhuyun.user.util;


import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.config.PropertyPlaceholderConfigurer;

public class ProjectDBinfoConfigurer extends PropertyPlaceholderConfigurer 
{	
	private static Map<String, String> ctxPropertiesMap;
	
	private static final Logger LOGGER = LoggerFactory.getLogger(ProjectDBinfoConfigurer.class);
	
	public ProjectDBinfoConfigurer() {
	}

	@Override
	protected void processProperties(ConfigurableListableBeanFactory beanFactoryToProcess, Properties props) throws BeansException {
		super.processProperties(beanFactoryToProcess, props);
		LOGGER.info("loading outter config file's content: {}", props.keySet());
		ctxPropertiesMap = new HashMap<String, String>();
		for (Object key : props.keySet()) {
			String keyStr = key.toString();
			String value = props.getProperty(keyStr);
			LOGGER.info("property: key={},  value={} ", keyStr, value);
			ctxPropertiesMap.put(keyStr, value);
		}
	}

	public static String getContextProperty(String name) {
		return ctxPropertiesMap.get(name);
	}
}

 

zkClient的序列化类MyZkSerializer.java

package com.zhuyun.ConfigCenter;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;

import org.I0Itec.zkclient.exception.ZkMarshallingError;
import org.I0Itec.zkclient.serialize.ZkSerializer;

public class MyZkSerializer implements ZkSerializer
{
	 @Override
	    public Object deserialize(byte[] bytes) throws ZkMarshallingError {
	        try {
	            ObjectInputStream inputStream = new ObjectInputStream(new ByteArrayInputStream(bytes));
	            Object object = inputStream.readObject();
	            inputStream.close();
	            return object;
	        } catch (ClassNotFoundException e) {
	            throw new ZkMarshallingError("Unable to find object class.", e);
	        } catch (IOException e) {
	            throw new ZkMarshallingError(e);
	        }
		 
	    }

	    @Override
	    public byte[] serialize(Object serializable) throws ZkMarshallingError {
	        try {
	            ByteArrayOutputStream byteArrayOS = new ByteArrayOutputStream();
	            ObjectOutputStream stream = new ObjectOutputStream(byteArrayOS);
	            stream.writeObject(serializable);
	            stream.close();
	            return byteArrayOS.toByteArray();
	        } catch (IOException e) {
	            throw new ZkMarshallingError(e);
	        }
	    }
}

 

ZkConfigUtil.java

package com.zhuyun.ConfigCenter;

import org.I0Itec.zkclient.ZkClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.zhuyun.entity.Config;

public class ZkConfigUtil {
	public static Logger LOG = LoggerFactory.getLogger(ZkConfigUtil.class);
	/**
	 * 配置文件同步到zookeeper
	 */
	public static void syncConfigToZk(String zkAddr, Config config){
		ZkClient zk = new ZkClient(zkAddr);
		zk.setZkSerializer(new MyZkSerializer());
		if(!zk.exists("/config-center/" + config.getConfig_name())){
			zk.createPersistent("/config-center/" + config.getConfig_name(),true);
		}
		zk.writeData("/config-center/" + config.getConfig_name(), config.getConfig_content());
		zk.close();
		LOG.info(".... synchronize {} to zookeeper....", config);
	}
	
	/**
	 * 删除zookeeper的配置节点
	 */
	public static void delConfigToZk(String zkAddr, String config_name){
		ZkClient zk = new ZkClient(zkAddr);
		zk.setZkSerializer(new MyZkSerializer());
		if(zk.exists("/config-center/" + config_name)){
			zk.delete("/config-center/" + config_name);
		}
		zk.close();
		LOG.info(".... delete config {} from zookeeper....", config_name);
	}
}

4.5 主类

Main.java

package com.zhuyun.main;

import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

import com.zhuyun.ConfigCenter.ZkConfigUtil;
import com.zhuyun.dao.ConfigDao;
import com.zhuyun.entity.Config;
import com.zhuyun.user.util.ProjectDBinfoConfigurer;

public class Main {
	public static ConfigDao configDao;
	public static String zkAddr;
	public static List<Config> allConfigs;
	public static Logger LOG = LoggerFactory.getLogger(Main.class);
	
	
	public static void main(String[] args) {
		ApplicationContext context = new ClassPathXmlApplicationContext("applicationContext.xml");
		
		configDao = (ConfigDao) context.getBean("configDaoImpl");
		zkAddr = ProjectDBinfoConfigurer.getContextProperty("zookeeper.address");
		allConfigs = configDao.findAll();
		
		for (Config config : allConfigs) {
			ZkConfigUtil.syncConfigToZk(zkAddr, config);		//将配置同步到zookeeper
		}
		
		LOG.info(".... config_center start success ....");
		
		while (true) {
			try {
				Thread.sleep(365*24*3600);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}
}

5. 运行结果

启动main方法

使用ZooInspector查看zookeeper上面的dubbo服务

 

通过dubbo客户端调用上传配置文件的接口

配置文件config.xml

<?xml version="1.0" encoding="UTF-8"?>
<push_engine>
	<property>
		<name>hello world</name>
		<age>22</age>
	</property>
</push_engine>

此时,zookeeper相应节点上的信息发生改变

 

MongoDB将配置文件的信息存储成二进制的形式

 

6. 客户端代码

此客户端,指的是前面流程图中的子应用

package com.zhuyun.client.test;

import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkMarshallingError;
import org.I0Itec.zkclient.serialize.SerializableSerializer;
import org.I0Itec.zkclient.serialize.ZkSerializer;

import com.zhuyun.ConfigCenter.MyZkSerializer;
import com.zhuyun.ConfigCenter.ZkConfigUtil;
 
public class ZkGetConfigClient {
	public static ZkConfigUtil zkConfigUtil;
	private Object config_content;

	public Object getConfig() {
		String zkAddr = "192.168.10.201:2181";
		ZkClient zk = new ZkClient(zkAddr);
		zk.setZkSerializer(new MyZkSerializer());
		
		config_content = zk.readData("/config-center/" + "push_engine");
        //具体解析字节数组的过程,根据实际需要而不同
		LoadXML.loadXML(((byte[])config_content));
		
		//监听配置文件修改
		zk.subscribeDataChanges("/config-center/" + "push_engine", new IZkDataListener(){
			@Override
			public void handleDataChange(String arg0, Object arg1)
					throws Exception {
				config_content = arg1;
				System.out.println("监听到配置文件被修改:"+config_content.toString());
				LoadXML.loadXML((byte[])config_content);
			}
 
			@Override
			public void handleDataDeleted(String arg0) throws Exception {
				config_content = null;
				System.out.println("监听到配置文件被删除");
			}
			
		});
		return config_content;
	}
	public static void main(String[] args) {
		ZkGetConfigClient client = new ZkGetConfigClient();
		client.getConfig();

		while (true) {
			try {
				Thread.sleep(365*24*3600);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}
 
	
}

 

至此,一个简单的配置中心就完成了

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐