简单的分布式配置中心
目前已有的分布式配置中心方案,有淘宝的diamond和百度的disconf。但由于公司需要的配置中心的需求比较简单,而且使用的数据库是MongoDB,而不像disconf需要mysql,于是就手写了一个简单的分布式配置中心。 1. 总体设计原则 公用配置不应该分散存在于各应用中,而是应该抽出来,统一存储到一个公用的位置(数据库或zookeeper)...
目前已有的分布式配置中心方案,有淘宝的diamond和百度的disconf。但由于公司需要的配置中心的需求比较简单,而且使用的数据库是MongoDB,而不像disconf需要mysql,于是就手写了一个简单的分布式配置中心。
1. 总体设计原则
- 公用配置不应该分散存在于各应用中,而是应该抽出来,统一存储到一个公用的位置(数据库或zookeeper)
- 对这些公用配置的添加、修改,应该有一个统一的配置管理中心应用来处理(做一个Web应用来增、删、查、改)
- 当公用的配置变化时,子应用不需要重新部署或重新启动,就能使用新的配置参数
2. 总体流程图
其中,
配置中心模块就是接下来我们要实现的部分,它通过dubbo开放增加、修改、删除功能的接口,给Web配置管理模块调用。
子应用指的是需要将配置文件分离出来放到配置中心的模块。
3. 配置文件
pom.xml
<!-- spring -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>5.0.7.RELEASE</version>
</dependency>
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.10</version>
</dependency>
<!-- mongodb -->
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongo-java-driver</artifactId>
<version>3.4.3</version>
</dependency>
<!-- spring-data-mongodb -->
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-mongodb</artifactId>
<version>1.9.4.RELEASE</version>
</dependency>
<!-- fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
<!-- log4j -->
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<!-- dubbo -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>dubbo</artifactId>
<version>2.6.2</version>
</dependency>
<!-- zookeeper -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.12</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.1</version>
</dependency>
<!-- dom4j -->
<dependency>
<groupId>dom4j</groupId>
<artifactId>dom4j</artifactId>
<version>1.6.1</version>
</dependency>
applicationContext.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:p="http://www.springframework.org/schema/p"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:mongo="http://www.springframework.org/schema/data/mongo"
xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd
http://www.springframework.org/schema/data/mongo http://www.springframework.org/schema/data/mongo/spring-mongo-1.8.xsd
http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd
http://code.alibabatech.com/schema/dubbo http://code.alibabatech.com/schema/dubbo/dubbo.xsd">
<bean id="propertyConfigurer" class="com.zhuyun.user.util.ProjectDBinfoConfigurer">
<property name="location">
<value>configCenter.properties</value>
</property>
<property name="fileEncoding" value="utf-8" />
</bean>
<!-- 自动扫描注解 -->
<context:component-scan base-package="com.zhuyun" />
<!-- credentials="${mongo.credentials}" -->
<mongo:mongo-client id="mongoClient" replica-set="${mongo.replica-set}">
<mongo:client-options
connections-per-host="1500"
threads-allowed-to-block-for-connection-multiplier="1"
connect-timeout="5000"
max-wait-time="120000"
socket-keep-alive="true"
socket-timeout="0"
write-concern="ACKNOWLEDGED"
read-preference="PRIMARY" />
<!-- write-concern="acknowledged" -->
</mongo:mongo-client>
<mongo:db-factory id="mongoDbFactory" dbname="config_center" mongo-ref="mongoClient" />
<bean id="mongoTypeMapper" class="org.springframework.data.mongodb.core.convert.DefaultMongoTypeMapper">
<constructor-arg name="typeKey">
<null/>
</constructor-arg>
</bean>
<bean id="mongoMappingContext" class="org.springframework.data.mongodb.core.mapping.MongoMappingContext"/>
<bean id="mongoConverter" class="org.springframework.data.mongodb.core.convert.MappingMongoConverter">
<constructor-arg name="mongoDbFactory" ref="mongoDbFactory"/>
<constructor-arg name="mappingContext" ref="mongoMappingContext"/>
<property name="typeMapper" ref="mongoTypeMapper" />
</bean>
<bean id="mongoTemplate" class="org.springframework.data.mongodb.core.MongoTemplate">
<constructor-arg name="mongoDbFactory" ref="mongoDbFactory"/>
<constructor-arg name="mongoConverter" ref="mongoConverter"/>
</bean>
<dubbo:application name="config_center" />
<dubbo:registry protocol="zookeeper" address="${zookeeper.address}" />
<dubbo:consumer timeout="50000" />
<!-- 具体的实现bean -->
<bean id="configCenter" class="com.zhuyun.dubbo.impl.ConfigCenterImpl" />
<dubbo:protocol name="dubbo" host="${dubbo.host}" port="${dubbo.port}" threads="1500" queues="3000" accepts="1500"/>
<!-- 声明需要暴露的服务接口 -->
<dubbo:service interface="com.zhuyun.dubbo.ConfigCenter" ref="configCenter" />
</beans>
configCenter.properties
mongo.replica-set=192.168.10.201:27017
zookeeper.address=192.168.10.201:2181
dubbo.host=127.0.0.1
dubbo.port=20000
log4j.properties
#log4j.rootLogger=INFO, stdout, R
log4j.rootLogger=DEBUG, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
#log4j.appender.R=org.apache.log4j.RollingFileAppender
#log4j.appender.R.Threshold=INFO
#log4j.appender.R.File=/var/log/newton/push_engine_2.1.log
#log4j.appender.R.Append=true
#log4j.appender.R.MaxFileSize=300MB
#log4j.appender.R.MaxBackupIndex=5
#log4j.appender.R.layout=org.apache.log4j.PatternLayout
#log4j.appender.R.layout.ConversionPattern=%d %p [%c] - %m%n
4.代码部分
4.1 MongoDB接口部分
ConfigDao.java
package com.zhuyun.dao;
import java.util.List;
import com.zhuyun.entity.Config;
public interface ConfigDao {
public void insert(Config config);
/**
* 获取所有配置
* @return
*/
public List<Config> findAll();
public Config findOne(String config_name);
/**
* 结果为1,表示更新成功
* @param config
* @return
*/
public int update(Config config);
/**
* 如果config_name存在,则更新
* 否则,则插入
* @param config
* @return
* -1 数据库异常
* 1 config_name存在,更新
* 2 config_name不存在,插入
*/
public int upsert(Config config);
/**
* 结果为 -1,数据库异常
* 0,表示数据不存在
* 1,表示删除成功
*/
public int remove(String config_name);
}
ConfigDaoImpl.java
package com.zhuyun.dao.impl;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.query.Update;
import org.springframework.stereotype.Component;
import com.mongodb.WriteResult;
import com.zhuyun.dao.ConfigDao;
import com.zhuyun.entity.Config;
@Component("configDaoImpl")
public class ConfigDaoImpl implements ConfigDao {
public static Logger LOG = LoggerFactory.getLogger(ConfigDaoImpl.class);
@Autowired
public MongoTemplate mongoTemplate;
@Override
public void insert(Config config) {
mongoTemplate.insert(config, "config");
}
@Override
public Config findOne(String config_name) {
Config config = mongoTemplate.findOne(Query.query(Criteria.where("config_name").is(config_name)),
Config.class, "config");
if (config != null) {
LOG.info(".... load {} config from MongoDB: {}...", config.getConfig_name(), config.toString());
}
return config;
}
@Override
public List<Config> findAll() {
List<Config> allConfigs = mongoTemplate.findAll(Config.class, "config");
LOG.info(".... load all configs from MongoDB: {}...", allConfigs);
return allConfigs;
}
@Override
public int update(Config config) {
WriteResult result = mongoTemplate.updateFirst(Query.query(Criteria.where("config_name").is(config.getConfig_name())),
Update.update("config_content", config.getConfig_content()), "config");
return result.getN();
}
@Override
public int upsert(Config config) {
WriteResult result = mongoTemplate.upsert(Query.query(Criteria.where("config_name").is(config.getConfig_name())),
Update.update("config_content", config.getConfig_content()),
"config");
int result_code = -1;
if (result.getN() <= 0) {
result_code = -1;
}
if (result.isUpdateOfExisting() == true) {
result_code = 1;
LOG.info(".... upsert {} config to MongoDB: {}...", config.getConfig_name(), config.toString());
}else {
result_code = 2;
LOG.info(".... upsert {} config to MongoDB: {}...", config.getConfig_name(), config.toString());
}
return result_code;
}
@Override
public int remove(String config_name) {
int result_code = -1;
WriteResult result = mongoTemplate.remove(Query.query(Criteria.where("config_name").is(config_name)), "config");
if (result.getN() < 0) {
result_code = -1;
}else if (result.getN() == 0) {
result_code = 0;
LOG.info(".... delete failed, {} config not exist in MongoDB ...", config_name);
} else {
result_code = 1;
LOG.info(".... delete {} config from MongoDB ...", config_name);
}
return result_code;
}
}
4.2 dubbo接口部分
ConfigCenter.java
package com.zhuyun.dubbo;
import com.zhuyun.entity.Config;
public interface ConfigCenter {
/**
* 加载配置,包括加载配置到MongoDB数据库和同步配置到zookeeper
* @param config
* @return
* {"retcode": "601"} 数据库错误
* {"retcode": "200"} 该配置在数据库里已存在,则更新该配置,并同步到zookeeper
* {"retcode": "201"} 该配置在数据库里不存在,则插入改配置,并同步到zookeeper
*/
public String upLoadConfig(Config config);
/**
* 获取配置,从MongoDB数据库获取
* @param config_name
* @return
*/
public Config getConfig(String config_name);
/**
* 删除配置, 从MongoDB数据库删除该配置,同时删除zookeeper上的该配置节点
* @param config_name
* @return
* {"retcode": "601"} 数据库错误
* {"retcode": "602"} 该配置在数据库里不存在
* {"retcode": "200"} 该配置在数据库里存在,并删除成功,同时删除ZK的相应节点
*/
public String removeConfig(String config_name);
}
ConfigCenterImpl.java
package com.zhuyun.dubbo.impl;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.alibaba.fastjson.JSONObject;
import com.zhuyun.ConfigCenter.ZkConfigUtil;
import com.zhuyun.dao.ConfigDao;
import com.zhuyun.dubbo.ConfigCenter;
import com.zhuyun.entity.Config;
import com.zhuyun.main.Main;
@Component("configCenterImpl")
public class ConfigCenterImpl implements ConfigCenter {
@Autowired
private ConfigDao configDao;
@Override
public String upLoadConfig(Config config) {
int result = configDao.upsert(config);
JSONObject jsonObject = new JSONObject();
if (result == -1) { //数据库错误
jsonObject.put("retcode", "601");
}else if (result == 1) { //该配置在数据库里已存在,则更新该配置,并同步到zookeeper
ZkConfigUtil.syncConfigToZk(Main.zkAddr, config);
jsonObject.put("retcode", "200");
}else if (result == 2) { //该配置在数据库里不存在,则插入改配置,并同步到zookeeper
ZkConfigUtil.syncConfigToZk(Main.zkAddr, config);
jsonObject.put("retcode", "201");
}
return jsonObject.toJSONString();
}
@Override
public Config getConfig(String config_name) {
return configDao.findOne(config_name);
}
@Override
public String removeConfig(String config_name) {
int result = configDao.remove(config_name);
JSONObject jsonObject = new JSONObject();
if (result == -1) { //数据库错误
jsonObject.put("retcode", "601");
}else if (result == 0) { //该配置在数据库里不存在
jsonObject.put("retcode", "602");
}else if (result == 1) { //该配置在数据库里存在,并删除成功,同时删除ZK的相应节点
ZkConfigUtil.delConfigToZk(Main.zkAddr, config_name);
jsonObject.put("retcode", "200");
}
return jsonObject.toJSONString();
}
}
4.3 实体类部分
Config.java
package com.zhuyun.entity;
import java.io.Serializable;
public class Config implements Serializable{
private static final long serialVersionUID = 1L;
private String config_name;
private Object config_content;
public String getConfig_name() {
return config_name;
}
public void setConfig_name(String config_name) {
this.config_name = config_name;
}
public Object getConfig_content() {
return config_content;
}
public void setConfig_content(Object config_content) {
this.config_content = config_content;
}
public Config() {
super();
}
public Config(String config_name, Object config_content) {
super();
this.config_name = config_name;
this.config_content = config_content;
}
@Override
public String toString() {
return "Config [config_name=" + config_name + ", config_content="
+ new String((byte[])config_content) + "]";
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result
+ ((config_content == null) ? 0 : config_content.hashCode());
result = prime * result
+ ((config_name == null) ? 0 : config_name.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
Config other = (Config) obj;
if (config_content == null) {
if (other.config_content != null)
return false;
} else if (!config_content.equals(other.config_content))
return false;
if (config_name == null) {
if (other.config_name != null)
return false;
} else if (!config_name.equals(other.config_name))
return false;
return true;
}
}
注:此实体类中,config_content的类型是Object类型, 是为了更好的扩展性,它可以是简单的String类型,也可以是byte[]类型的,这样的话,配置文件就可以转换成byte[]存储在数据库中了。
4.4 工具类
ProjectDBinfoConfigurer.java
package com.zhuyun.user.util;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.config.PropertyPlaceholderConfigurer;
public class ProjectDBinfoConfigurer extends PropertyPlaceholderConfigurer
{
private static Map<String, String> ctxPropertiesMap;
private static final Logger LOGGER = LoggerFactory.getLogger(ProjectDBinfoConfigurer.class);
public ProjectDBinfoConfigurer() {
}
@Override
protected void processProperties(ConfigurableListableBeanFactory beanFactoryToProcess, Properties props) throws BeansException {
super.processProperties(beanFactoryToProcess, props);
LOGGER.info("loading outter config file's content: {}", props.keySet());
ctxPropertiesMap = new HashMap<String, String>();
for (Object key : props.keySet()) {
String keyStr = key.toString();
String value = props.getProperty(keyStr);
LOGGER.info("property: key={}, value={} ", keyStr, value);
ctxPropertiesMap.put(keyStr, value);
}
}
public static String getContextProperty(String name) {
return ctxPropertiesMap.get(name);
}
}
zkClient的序列化类MyZkSerializer.java
package com.zhuyun.ConfigCenter;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import org.I0Itec.zkclient.exception.ZkMarshallingError;
import org.I0Itec.zkclient.serialize.ZkSerializer;
public class MyZkSerializer implements ZkSerializer
{
@Override
public Object deserialize(byte[] bytes) throws ZkMarshallingError {
try {
ObjectInputStream inputStream = new ObjectInputStream(new ByteArrayInputStream(bytes));
Object object = inputStream.readObject();
inputStream.close();
return object;
} catch (ClassNotFoundException e) {
throw new ZkMarshallingError("Unable to find object class.", e);
} catch (IOException e) {
throw new ZkMarshallingError(e);
}
}
@Override
public byte[] serialize(Object serializable) throws ZkMarshallingError {
try {
ByteArrayOutputStream byteArrayOS = new ByteArrayOutputStream();
ObjectOutputStream stream = new ObjectOutputStream(byteArrayOS);
stream.writeObject(serializable);
stream.close();
return byteArrayOS.toByteArray();
} catch (IOException e) {
throw new ZkMarshallingError(e);
}
}
}
ZkConfigUtil.java
package com.zhuyun.ConfigCenter;
import org.I0Itec.zkclient.ZkClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.zhuyun.entity.Config;
public class ZkConfigUtil {
public static Logger LOG = LoggerFactory.getLogger(ZkConfigUtil.class);
/**
* 配置文件同步到zookeeper
*/
public static void syncConfigToZk(String zkAddr, Config config){
ZkClient zk = new ZkClient(zkAddr);
zk.setZkSerializer(new MyZkSerializer());
if(!zk.exists("/config-center/" + config.getConfig_name())){
zk.createPersistent("/config-center/" + config.getConfig_name(),true);
}
zk.writeData("/config-center/" + config.getConfig_name(), config.getConfig_content());
zk.close();
LOG.info(".... synchronize {} to zookeeper....", config);
}
/**
* 删除zookeeper的配置节点
*/
public static void delConfigToZk(String zkAddr, String config_name){
ZkClient zk = new ZkClient(zkAddr);
zk.setZkSerializer(new MyZkSerializer());
if(zk.exists("/config-center/" + config_name)){
zk.delete("/config-center/" + config_name);
}
zk.close();
LOG.info(".... delete config {} from zookeeper....", config_name);
}
}
4.5 主类
Main.java
package com.zhuyun.main;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import com.zhuyun.ConfigCenter.ZkConfigUtil;
import com.zhuyun.dao.ConfigDao;
import com.zhuyun.entity.Config;
import com.zhuyun.user.util.ProjectDBinfoConfigurer;
public class Main {
public static ConfigDao configDao;
public static String zkAddr;
public static List<Config> allConfigs;
public static Logger LOG = LoggerFactory.getLogger(Main.class);
public static void main(String[] args) {
ApplicationContext context = new ClassPathXmlApplicationContext("applicationContext.xml");
configDao = (ConfigDao) context.getBean("configDaoImpl");
zkAddr = ProjectDBinfoConfigurer.getContextProperty("zookeeper.address");
allConfigs = configDao.findAll();
for (Config config : allConfigs) {
ZkConfigUtil.syncConfigToZk(zkAddr, config); //将配置同步到zookeeper
}
LOG.info(".... config_center start success ....");
while (true) {
try {
Thread.sleep(365*24*3600);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
5. 运行结果
启动main方法
使用ZooInspector查看zookeeper上面的dubbo服务
通过dubbo客户端调用上传配置文件的接口
配置文件config.xml
<?xml version="1.0" encoding="UTF-8"?>
<push_engine>
<property>
<name>hello world</name>
<age>22</age>
</property>
</push_engine>
此时,zookeeper相应节点上的信息发生改变
MongoDB将配置文件的信息存储成二进制的形式
6. 客户端代码
此客户端,指的是前面流程图中的子应用
package com.zhuyun.client.test;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkMarshallingError;
import org.I0Itec.zkclient.serialize.SerializableSerializer;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import com.zhuyun.ConfigCenter.MyZkSerializer;
import com.zhuyun.ConfigCenter.ZkConfigUtil;
public class ZkGetConfigClient {
public static ZkConfigUtil zkConfigUtil;
private Object config_content;
public Object getConfig() {
String zkAddr = "192.168.10.201:2181";
ZkClient zk = new ZkClient(zkAddr);
zk.setZkSerializer(new MyZkSerializer());
config_content = zk.readData("/config-center/" + "push_engine");
//具体解析字节数组的过程,根据实际需要而不同
LoadXML.loadXML(((byte[])config_content));
//监听配置文件修改
zk.subscribeDataChanges("/config-center/" + "push_engine", new IZkDataListener(){
@Override
public void handleDataChange(String arg0, Object arg1)
throws Exception {
config_content = arg1;
System.out.println("监听到配置文件被修改:"+config_content.toString());
LoadXML.loadXML((byte[])config_content);
}
@Override
public void handleDataDeleted(String arg0) throws Exception {
config_content = null;
System.out.println("监听到配置文件被删除");
}
});
return config_content;
}
public static void main(String[] args) {
ZkGetConfigClient client = new ZkGetConfigClient();
client.getConfig();
while (true) {
try {
Thread.sleep(365*24*3600);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
至此,一个简单的配置中心就完成了
更多推荐
所有评论(0)