阅读本文之前建议阅读这篇文章,canal单机模式:http://blog.csdn.net/hackerwin7/article/details/37923607
机器准备:
mysql:192.168.213.41:3306
canal server:192.168.213.42:11111 、192.168.213.43:11111
zookeeper:192.168.213.44:4180、192.168.213.45:4180、192.168.213.46:4180
安装与配置:
安装配置mysql-》运程登录这个节点
可以apt-get 或者 源码安装都可以
配置my.cnf
- [mysqld]
- log-bin=mysql-bin #添加这一行就ok
- binlog-format=ROW #选择row模式
- server_id=1 #配置mysql replaction需要定义,不能和canal的slaveId重复
新建canal用户,并赋予权限
- CREATE USER canal IDENTIFIED BY 'canal';
- GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
- -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
- FLUSH PRIVILEGES;
关于配置mysql用户密码问题:http://blog.csdn.net/hackerwin7/article/details/38040057
安装配置zookeeper-》下载zookeeper包 http://zookeeper.apache.org/releases.html#download
上传到远程节点
- scp zookeeper.tar.gz root@192.168.213.44
- scp zookeeper.tar.gz root@192.168.213.45
- scp zookeeper.tar.gz root@192.168.213.46
登录远程节点(其他两个节点的擦作大同小异)
解压包,配置,在cnof 目录新建zoo.cfg
- tickTime=2000
- initLimit=5
- syncLimit=2
- dataDir=/home/canal/servers/zookeeper/data
- dataLogDir=/home/canal/servers/zookeeper/logs
- clientPort=4180
- server.44=192.168.213.44:2888:3888
- server.45=192.168.213.45:2888:3888
- server.46=192.168.213.46:2888:3888
在目录上述dataDir的目录中创建myid, 将上述的三个数,server.X(我这里的X是44,45,46),中的X值写入myid文件,三台节点的myid这是不一样的哈。
安装canal-》 本机去下载canal的tar包 然后解压,地址:https://github.com/alibaba/canal/releases 。
本机分别上传包到两个远程节点
- scp canal.tar.gz root@192.168.213.42:/home/canal/
- scp canal.tar.gz root@192.168.213.43:~/
现在开始配置两个远程节点的canal
canal.properties:
- # 用逗号隔开 且 不留空格
- canal.zkServers=192.168.213.44:4180,192.168.213.45:4180,192.168.213.46:4180
- canal.instance.global.spring.xml = classpath:spring/default-instance.xml
instance.properties:
- canal.instance.mysql.slaveId = 1234 ##另外一台机器改成1235,保证slaveId不重复即可
- canal.instance.master.address = 192.168.213.41:3306
启动运行:
1、mysql 启动与运行
- /etc/init.d/mysqld restart #各版本mysql 启动命令不太相同
2、zookeeper 启动与运行
- # 三个节点都要启动
- zookeeper/bin/zkServer.sh start
然后在在三个节点中,随意取一个运行客户端
- ./zookeeper/bin/zkCli.sh -server 192.168.213.44:4180
3、canal 启动与运行
两个节点都启动canal
状态检测:
进入任一个 zookeeper 节点,进入zookeeper客户端
- ./zookeeper/bin/zkCli.sh -server 192.168.213.44:4180
里面有get ls等命令来获取相应节点信息
- # 获取正在运行的canal server
- get /otter/canal/destinations/example/running
- # 获取正在连接的canal client
- get /otter/canal/destinations/example/1001/running
- # 获取当前最后一次消费车成功的binlog
- get /otter/canal/destinations/example/1001/cursor
现在我们来运行client 客户端 (见本文顶部的单机canal文章)
然后我们链接远程mysql,在mysql执行相关insert creat 等语句
然后观察client客户端的具体抓取信息。
HA模式切换:
现在我们来将通过zookeeper获取正在运行的canal server,然后我们将当前运行的canal server 正常关闭掉,我们可以通过zookeeper看到另一台canal server会成为正在运行的canal server,这就是HA模式的自动切换。这些都可以通过zookeeper查询到状态信息。
本系列下篇文章:canal HA 的异常切换与 client的数据抓取。
直接在这里附上执行了,注意循环try-catch的问题:
-
-
-
- import java.net.InetSocketAddress;
- import java.util.List;
-
- import com.alibaba.otter.canal.client.CanalConnector;
- import com.alibaba.otter.canal.common.utils.AddressUtils;
- import com.alibaba.otter.canal.protocol.Message;
- import com.alibaba.otter.canal.protocol.CanalEntry.Column;
- import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
- import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
- import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
- import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
- import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
- import com.alibaba.otter.canal.protocol.exception.*;
-
-
- import com.alibaba.otter.canal.client.*;
- import com.alibaba.otter.canal.client.impl.running.ClientRunningMonitor;
- import org.jetbrains.annotations.NotNull;
-
- public class ClientSample {
-
- public static void main(String args[]) {
-
- CanalConnector connector = CanalConnectors.newClusterConnector("192.168.213.44:4180,192.168.213.45:4180,192.168.213.46:4180", "example", "", "");
-
- int batchSize = 1;
- int emptyCount = 0;
- while(true) {
- try {
- connector.connect();
- connector.subscribe(".*\\..*");
- while(true) {
- Message messages = connector.getWithoutAck(1000);
- long bachId = messages.getId();
- int size = messages.getEntries().size();
- if(bachId == -1 || size == 0) {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("No DATA!!!!!!!!!!!!!!!!!!!!!!!!");
- } else {
- printEntry(messages.getEntries());
- }
- }
- } catch (Exception e) {
- System.out.println("============================================================connect crash");
- } finally {
- connector.disconnect();
- }
- }
- }
-
- private static void printEntry(@NotNull List<Entry> entrys) {
- for (Entry entry : entrys) {
- if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
- continue;
- }
-
- RowChange rowChage = null;
- try {
- rowChage = RowChange.parseFrom(entry.getStoreValue());
- } catch (Exception e) {
- throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
- e);
- }
-
- EventType eventType = rowChage.getEventType();
- System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
- entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
- entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
- eventType));
-
- for (RowData rowData : rowChage.getRowDatasList()) {
- if (eventType == EventType.DELETE) {
- printColumn(rowData.getBeforeColumnsList());
- } else if (eventType == EventType.INSERT) {
- printColumn(rowData.getAfterColumnsList());
- } else {
- System.out.println("-------> before");
- printColumn(rowData.getBeforeColumnsList());
- System.out.println("-------> after");
- printColumn(rowData.getAfterColumnsList());
- }
- }
- }
- }
-
- private static void printColumn(@NotNull List<Column> columns) {
- for (Column column : columns) {
- System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
- }
- }
- }
所有评论(0)