开始的架构图已经有写得非常漂亮的。我这里就不重新再编辑了。java 版本实现job server和worker从第6点开始,红色部分是我遇到的一些坑。。。

1, 环境

CentOS 64位, MySQL, Redis, Java

mysql安装通过lnmp进行安装。所以默认安装路径为/usr/local/mysql

2, Redis简介

Redis是一个开源的K-V内存数据库,它的key可以是string/set/hash/list/...,因为是基于内存的,所在访问速度相当快。

3, Gearman简介

Gearman是一个开源的Map/Reduce分布式计算框架,具有丰富的client sdk,而且它支持MySQL UDF。

Gearman工作图

Gearman工作图

Gearman调用流程

Gearman调用流程

Gearman集群

从图中可以看出貌似Gearman的集群功能比较弱,Job Server之间没啥关系好像。
Gearman集群

4, MySQL - Redis配合使用方案

MySQL - Redis配合使用方案

首先我们以MySQL数据为主,将insert/update/delete交给MySQL,而select交给redis;
当有数据发生变化时,通过MySQL Trigger实时异步调用Gearman的UDF提交一个job给Job Server,当job执行的时候会去更新redis;从而保证redis与MySQL中的数据是同步的。

 

5, 软件安装

安装gearman

//安装依赖
$ yum install -y boost-devel gperf libevent-devel libuuid-devel

//下载gearman  2015-12-10来看都是最新版本
$ wget https://launchpad.net/gearmand/1.2/1.1.12/+download/gearmand-1.1.12.tar.gz
$ tar zxvf gearmand-1.1.12.tar.gz

//编译安装,指定mysqlclient的链接路径
$ ./configure
$ make
$ make install

//启动gearmand服务端 (启动之时,在/var/log/下创建gearmand.log日志文件。-l 指定日志文件  -d后台运行 -L 0.0.0.0 绑定到IPV4)
$ /usr/local/sbin/gearmand -L 0.0.0.0 -l /var/log/gearmand.log -d

//查看是否启动成功[root@iZ94uyronrjZ mysql]# ps -ef | grep gearman
root     31142     1  0 17:06 ?        00:00:00 /usr/local/sbin/gearmand -L 0.0.0.0 -l /znwx/logs/gearmand/gearmand.log -d
root     31174 30507  0 17:40 pts/0    00:00:00 grep gearman//查看是否安装成功,查看gearman版本
$ /usr/local/sbin/gearmand -V

 

 

5, MySQL UDF + Trigger同步数据到Gearman

安装lib_mysqludf_json

 

lib_mysqludf_json可以把MySQL表的数据以json数据格式输出

//下载lib_mysqludf_json(地址:https://github.com/mysqludf)
$ cd lib_mysqludf_json
先删除 lib_mysqludf_json.so
//编译 mysql_config 这是mysql的配置文件,可以 find /usr -name mysql_config 搜索下在什么位置
$ gcc $(/usr/local/mysql/bin/mysql_config  --cflags) -shared -fPIC -o lib_mysqludf_json.so lib_mysqludf_json.c

//拷贝lib_mysqludf_json.so到MySQL的plugin目录
//可以登陆MySQL,输入命令"show variables like '%plugin%'"查看plugin位置
$ cp lib_mysqludf_json.so /usr/lib64/mysql/plugin/

//演示lib_mysqludf_json功能
$ mysql -uname -hhost -ppwd
//首先注册UDF函数
mysql> CREATE FUNCTION json_object RETURNS STRING 
       SONAME "lib_mysqludf_json.so";
//json_array|json_members|json_values函数注册方式与json_object一样.
mysql> use test;
mysql> select * from user_list;
+------+----------+
| NAME | PASSWORD |
+------+----------+
| troy | pwd      |
+------+----------+
mysql> select json_object(name,password) as user from user_list;
+----------------------------------+
| user                             |
+----------------------------------+
| {"name":"troy","password":"pwd"} |
+----------------------------------+

安装gearman-mysql-udf

//下载 2015年12月10日已知最新版本
$ wget https://launchpad.net/gearman-mysql-udf/trunk/0.6/+download/gearman-mysql-udf-0.6.tar.gz
$ tar zxvf gearman-mysql-udf-0.6.tar.gz
$ cd gearman-mysql-udf-0.6

//安装libgearman-devel
$ yum install libgearman-devel

//编译安装

//可以登陆MySQL,输入命令"show variables like '%plugin%'"查看plugin位置, mysql_config的配置文件,以及插件库所在路径,编译之后会在此路径生成.so文件

 

./configure --with-mysql=/usr/local/mysql/bin/mysql_config --libdir=/usr/local/mysql/lib/plugin/

$ make && make install //登录MySQL注册UDF函数 mysql> CREATE FUNCTION gman_do_background RETURNS STRING SONAME "libgearman_mysql_udf.so"; mysql> CREATE FUNCTION gman_servers_set RETURNS STRING SONAME "libgearman_mysql_udf.so"; //函数gman_do|gman_do_high|gman_do_low|gman_do_high_background|gman_do_low_background|gman_sum注册方式类似,请参考gearman-mysql-udf-0.6/README //指定gearman job server地址 mysql> SELECT gman_servers_set('127.0.0.1:4730');

如果出现异常信息:
ERROR 1126 (HY000): Can't open shared library 'libgearman_mysql_udf.so' (errno: 11 libgearman.so.8: cannot open shared object file: No such file or directory)
表示系统找不到 libgearman.so 文件,一般so都在/usr/local/lib目录下,修改配置文件/etc/ld.so.conf,将/usr/local/lib目录加入进去即可:

$ cat /etc/ld.so.conf
include ld.so.conf.d/*.conf
/usr/local/lib
$ /sbin/ldconfig -v | grep gearman*

 

 

 

MySQL Trigger调用Gearman UDF实现同步

DELIMITER $$
CREATE TRIGGER user_list_data_to_redis AFTER UPDATE ON user_list
  FOR EACH ROW BEGIN
    SET @ret=gman_do_background('MySQLToRedis', json_object(NEW.name as 'name', NEW.password as 'password')); 
  END$$
DELIMITER ;

 

 

 

6、这里才是重点:java实现job server 和worker或者单worker,网上都是一些php的。可恶啊。。。。。

我上代码了:

 

/**
 * mysql 同步到 redis 的工具类。连接远程gearman job server
 * 实现原理 
 * mysql_udf  >>>>>  gearman job server>>>> gearm worker(本类) >>>>> redis
 * @author 堼宸落宇 2015年12月10日
 *
 */
@Component
public class MysqlToRedisWorker implements GearmanFunction{
	private static final Logger log = LoggerFactory.getLogger(MysqlToRedisWorker.class);
	
	public static final String ECHO_FUNCTION_NAME = "MySQLToRedis";
	
	@Autowired
	private Config config;
    
	/**
	 * 连接其他jobserver用的。
	 */
	public void startWorker() {
		Gearman gearman = Gearman.createGearman();  //创建gearman对象,无论是client,worker都是由这个对象产生的
		log.info("MysqlToRedisWorkder connection:"+config.getEchoHost()+":"+config.getEchoPort()+",function:"+MysqlToRedisWorker.ECHO_FUNCTION_NAME);
		GearmanServer server = gearman.createGearmanServer(config.getEchoHost(), config.getEchoPort());  //创建gearman server,主要是server地址和端口

        GearmanWorker worker = gearman.createGearmanWorker();  //正题来了,创建work节点。
        worker.setReconnectPeriod(2, TimeUnit.SECONDS);  //设置超时重连时间
        worker.setMaximumConcurrency(5);  //最大并发数

        worker.addFunction(<span style="color:#cc0000;">ECHO_FUNCTION_NAME</span>, this);  //添加function方法
        worker.addServer(server);  //将work添加到server中
        log.info("MysqlToRedisWorkder is started!!!!");
	}
	
	@Override
	public byte[] work(String func, byte[] data, GearmanFunctionCallback callback)
			throws Exception {
		
		log.info("收到mysql的数据:::"+new String(data));
		System.out.println("@@@@@ " + new String(data));  //byte[] data是从client传递来的参数,我们将参数增加@@@@字符串后,原封不动返回
        return data;
	}


说明以及问题:此类采用了gearman官网的java-gearman-service(地址:https://code.google.com/p/java-gearman-service/),目前release版本是0.6.6。java-gearman-servic.jar包中,即包括gearman server,还包括client和work客户端API,此包在谷歌上。我这里上传了一份:http://download.csdn.net/detail/u012386696/9343319.

 

问题:config类为spring注入的配置文件类,在worker.addFunction中,如果通过config类的属性,并且属性是从配置文件来的就会有问题。不知道为啥,写死就是OK的。此类连接远程的gearman job server.

 

jar包需要添加到本地jar仓库:

mvn install:install-file -Dfile=C:\software\java-gearman-service-0.6.6.jar -DgroupId=org.gearman.jgs -DartifactId=java-gearman-service -Dversion=0.6.6 -Dpackaging=jar

 

7、创建一个job server并注册worker,本地服务。

 

/**
 * 创建一个job server 服务器,并且注册一个worker。
 * @author <span style="font-family: tahoma, helvetica, arial;">堼宸落宇</span><span style="font-family: tahoma, helvetica, arial;">  2015年12月10日</span>
 *
 */
@Component
public class MysqlToRedisJobServer {
	private static final Logger log = LoggerFactory.getLogger(MysqlToRedisJobServer.class);
	
	@Autowired
	MysqlToRedisWorker mysqlToRedisWorker;
	@Autowired
	Config config;
	
	public void startJobServer() throws Exception{
		 /*
         * Create a Gearman instance
         */
        Gearman gearman = Gearman.createGearman();

        try {

                /*
                 * Start a new job server. The resulting server will be running in
                 * the local address space.
                 * 
                 * Parameter 1: The port number to listen on
                 * 
                 * throws IOException
                 */
                GearmanServer server = gearman.startGearmanServer(config.getEchoPort());
                
                if (server!=null) {
					log.info("Start gearm jobServer with java !funcname:"+MysqlToRedisWorker.ECHO_FUNCTION_NAME+" port:"+config.getEchoPort());
				}
                
                /*
                 * Create a gearman worker. The worker poll jobs from the server and
                 * executes the corresponding GearmanFunction
                 */
                GearmanWorker worker = gearman.createGearmanWorker();

                /*
                 *  Tell the worker how to perform the echo function
                 */
                worker.addFunction(MysqlToRedisWorker.ECHO_FUNCTION_NAME, mysqlToRedisWorker);

                /*
                 *  Tell the worker that it may communicate with the given job server
                 */
                worker.addServer(server);

        } catch (IOException ioe) {

                /*
                 * If an exception occurs, make sure the gearman service is shutdown
                 */
                gearman.shutdown();

                // forward exception
                throw ioe;
        }
	}


说明以及问题:这里创建一个job server,并设置监听端口。。

 

 

到这里就完成了java版本的mysql到redis的复制。可没看到复制到redis中啊。worker类的work方法就是从mysql_udf中传过来的,这里就随便怎么玩了,不是么。。。  备注  谨记 2天的成果

2019年01月13日更新:这个方法建议还是仅仅作为学习使用,线上环境还有太多需要关注的地方,特别是高可用,在出现备份延迟之后等应该怎么处理都是比较麻烦的。可以考虑一下利用canal来进行基于binlog的复制(同mysql自己的主备复制)。

Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐