阅读本文之前建议阅读这篇文章,canal单机模式:http://blog.csdn.net/hackerwin7/article/details/37923607


机器准备:

mysql:192.168.213.41:3306

canal server:192.168.213.42:11111 、192.168.213.43:11111

zookeeper:192.168.213.44:4180、192.168.213.45:4180、192.168.213.46:4180

安装与配置:

安装配置mysql-》运程登录这个节点

[html]  view plain copy 在CODE上查看代码片 派生到我的代码片
  1. ssh root@192.168.213.41  

可以apt-get  或者  源码安装都可以

配置my.cnf

[html]  view plain copy 在CODE上查看代码片 派生到我的代码片
  1. [mysqld]    
  2. log-bin=mysql-bin #添加这一行就ok    
  3. binlog-format=ROW #选择row模式    
  4. server_id=1 #配置mysql replaction需要定义,不能和canal的slaveId重复    

新建canal用户,并赋予权限

[html]  view plain copy 在CODE上查看代码片 派生到我的代码片
  1. CREATE USER canal IDENTIFIED BY 'canal';      
  2. GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';    
  3. -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;    
  4. FLUSH PRIVILEGES;   

关于配置mysql用户密码问题:http://blog.csdn.net/hackerwin7/article/details/38040057


安装配置zookeeper-》下载zookeeper包  http://zookeeper.apache.org/releases.html#download

上传到远程节点

[html]  view plain copy 在CODE上查看代码片 派生到我的代码片
  1. scp zookeeper.tar.gz root@192.168.213.44  
  2. scp zookeeper.tar.gz root@192.168.213.45  
  3. scp zookeeper.tar.gz root@192.168.213.46  

登录远程节点(其他两个节点的擦作大同小异)

[html]  view plain copy 在CODE上查看代码片 派生到我的代码片
  1. ssh root@192.168.213.44  

解压包,配置,在cnof 目录新建zoo.cfg

[html]  view plain copy 在CODE上查看代码片 派生到我的代码片
  1. tickTime=2000      
  2. initLimit=5      
  3. syncLimit=2      
  4. dataDir=/home/canal/servers/zookeeper/data      
  5. dataLogDir=/home/canal/servers/zookeeper/logs      
  6. clientPort=4180  
  7. server.44=192.168.213.44:2888:3888    
  8. server.45=192.168.213.45:2888:3888      
  9. server.46=192.168.213.46:2888:3888  

在目录上述dataDir的目录中创建myid, 将上述的三个数,server.X(我这里的X是44,45,46),中的X值写入myid文件,三台节点的myid这是不一样的哈。


安装canal-》 本机去下载canal的tar包 然后解压,地址:https://github.com/alibaba/canal/releases 。

本机分别上传包到两个远程节点

[html]  view plain copy 在CODE上查看代码片 派生到我的代码片
  1. scp canal.tar.gz root@192.168.213.42:/home/canal/  

[html]  view plain copy 在CODE上查看代码片 派生到我的代码片
  1. scp canal.tar.gz root@192.168.213.43:~/  
现在开始配置两个远程节点的canal

canal.properties:

[html]  view plain copy 在CODE上查看代码片 派生到我的代码片
  1. # 用逗号隔开 且 不留空格  
  2. canal.zkServers=192.168.213.44:4180,192.168.213.45:4180,192.168.213.46:4180  
  3. canal.instance.global.spring.xml = classpath:spring/default-instance.xml  
instance.properties:

[html]  view plain copy 在CODE上查看代码片 派生到我的代码片
  1. canal.instance.mysql.slaveId = 1234 ##另外一台机器改成1235,保证slaveId不重复即可  
  2. canal.instance.master.address = 192.168.213.41:3306  

启动运行:

1、mysql 启动与运行

[html]  view plain copy 在CODE上查看代码片 派生到我的代码片
  1. /etc/init.d/mysqld restart #各版本mysql 启动命令不太相同  
2、zookeeper 启动与运行

[html]  view plain copy 在CODE上查看代码片 派生到我的代码片
  1. # 三个节点都要启动  
  2. zookeeper/bin/zkServer.sh start  
然后在在三个节点中,随意取一个运行客户端

[html]  view plain copy 在CODE上查看代码片 派生到我的代码片
  1. ./zookeeper/bin/zkCli.sh -server 192.168.213.44:4180  
3、canal 启动与运行

两个节点都启动canal

[html]  view plain copy 在CODE上查看代码片 派生到我的代码片
  1. ./canal/bin/startup.sh  

状态检测:

进入任一个 zookeeper 节点,进入zookeeper客户端

[html]  view plain copy 在CODE上查看代码片 派生到我的代码片
  1. ./zookeeper/bin/zkCli.sh -server 192.168.213.44:4180  

里面有get ls等命令来获取相应节点信息

[html]  view plain copy 在CODE上查看代码片 派生到我的代码片
  1. # 获取正在运行的canal server  
  2. get /otter/canal/destinations/example/running  
  3. # 获取正在连接的canal client  
  4. get /otter/canal/destinations/example/1001/running  
  5. # 获取当前最后一次消费车成功的binlog  
  6. get /otter/canal/destinations/example/1001/cursor  

现在我们来运行client 客户端 (见本文顶部的单机canal文章)

然后我们链接远程mysql,在mysql执行相关insert creat 等语句

然后观察client客户端的具体抓取信息。

HA模式切换:

现在我们来将通过zookeeper获取正在运行的canal server,然后我们将当前运行的canal server 正常关闭掉,我们可以通过zookeeper看到另一台canal server会成为正在运行的canal server,这就是HA模式的自动切换。这些都可以通过zookeeper查询到状态信息。



本系列下篇文章:canal HA 的异常切换与 client的数据抓取。

直接在这里附上执行了,注意循环try-catch的问题:

[java]  view plain copy 在CODE上查看代码片 派生到我的代码片
  1. /** 
  2.  * Created by hp on 14-7-17. 
  3.  */  
  4. import java.net.InetSocketAddress;  
  5. import java.util.List;  
  6.   
  7. import com.alibaba.otter.canal.client.CanalConnector;  
  8. import com.alibaba.otter.canal.common.utils.AddressUtils;  
  9. import com.alibaba.otter.canal.protocol.Message;  
  10. import com.alibaba.otter.canal.protocol.CanalEntry.Column;  
  11. import com.alibaba.otter.canal.protocol.CanalEntry.Entry;  
  12. import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;  
  13. import com.alibaba.otter.canal.protocol.CanalEntry.EventType;  
  14. import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;  
  15. import com.alibaba.otter.canal.protocol.CanalEntry.RowData;  
  16. import com.alibaba.otter.canal.protocol.exception.*;  
  17.   
  18.   
  19. import com.alibaba.otter.canal.client.*;  
  20. import com.alibaba.otter.canal.client.impl.running.ClientRunningMonitor;  
  21. import org.jetbrains.annotations.NotNull;  
  22.   
  23. public class ClientSample {  
  24.   
  25.     public static void main(String args[]) {  
  26.         // 创建链接  
  27.         CanalConnector connector = CanalConnectors.newClusterConnector("192.168.213.44:4180,192.168.213.45:4180,192.168.213.46:4180""example""""");  
  28.   
  29.         int batchSize = 1;  
  30.         int emptyCount = 0;  
  31.         while(true) {  
  32.             try {  
  33.                 connector.connect();  
  34.                 connector.subscribe(".*\\..*");  
  35.                 while(true) {  
  36.                     Message messages = connector.getWithoutAck(1000);  
  37.                     long bachId = messages.getId();  
  38.                     int size = messages.getEntries().size();  
  39.                     if(bachId == -1 || size == 0) {  
  40.                         try {  
  41.                             Thread.sleep(1000);  
  42.                         } catch (InterruptedException e) {  
  43.                             e.printStackTrace();  
  44.                         }  
  45.                         System.out.println("No DATA!!!!!!!!!!!!!!!!!!!!!!!!");  
  46.                     } else {  
  47.                         printEntry(messages.getEntries());  
  48.                     }  
  49.                 }  
  50.             } catch (Exception e) {  
  51.                 System.out.println("============================================================connect crash");  
  52.             } finally {  
  53.                 connector.disconnect();  
  54.             }  
  55.         }  
  56.     }  
  57.   
  58.     private static void printEntry(@NotNull List<Entry> entrys) {  
  59.         for (Entry entry : entrys) {  
  60.             if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {  
  61.                 continue;  
  62.             }  
  63.   
  64.             RowChange rowChage = null;  
  65.             try {  
  66.                 rowChage = RowChange.parseFrom(entry.getStoreValue());  
  67.             } catch (Exception e) {  
  68.                 throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),  
  69.                         e);  
  70.             }  
  71.   
  72.             EventType eventType = rowChage.getEventType();  
  73.             System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",  
  74.                     entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),  
  75.                     entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),  
  76.                     eventType));  
  77.   
  78.             for (RowData rowData : rowChage.getRowDatasList()) {  
  79.                 if (eventType == EventType.DELETE) {  
  80.                     printColumn(rowData.getBeforeColumnsList());  
  81.                 } else if (eventType == EventType.INSERT) {  
  82.                     printColumn(rowData.getAfterColumnsList());  
  83.                 } else {  
  84.                     System.out.println("-------> before");  
  85.                     printColumn(rowData.getBeforeColumnsList());  
  86.                     System.out.println("-------> after");  
  87.                     printColumn(rowData.getAfterColumnsList());  
  88.                 }  
  89.             }  
  90.         }  
  91.     }  
  92.   
  93.     private static void printColumn(@NotNull List<Column> columns) {  
  94.         for (Column column : columns) {  
  95.             System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());  
  96.         }  
  97.     }  
  98. }  

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐